Changeset - 78de1ebfd99d
[Not reviewed]
0 4 0
Christopher Esterhuyse - 5 years ago 2020-02-21 14:23:42
christopher.esterhuyse@gmail.com
more detailed debug printing
4 files changed with 81 insertions and 40 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
[package]
 
name = "reowolf_rs"
 
version = "0.1.1"
 
authors = [
 
	"Christopher Esterhuyse <christopher.esterhuyse@gmail.com>",
 
	"Hans-Dieter Hiep <hdh@cwi.nl>"
 
]
 
edition = "2018"
 

	
 
[dependencies]
 
# hibitset = "0.6.2"
 

	
 
# runtime stuff
 
derive_more = "0.99.2"
 
getrandom = "0.1.14" # tiny crate. used to guess controller-id
 
take_mut = "0.2.2"
 
maplit = "1.0.2" # convenience macros
 
indexmap = "1.3.0" # hashsets/hashmaps with efficient arbitrary element removal
 

	
 
# network stuff
 
integer-encoding = "1.0.7"
 
byteorder = "1.3.2"
 
mio = "0.6.21" # migrate to mio 0.7.0 when it stabilizes. It's much better.
 
mio-extras = "2.0.6"
 

	
 
# protocol stuff
 
id-arena = "2.2.1"
 
backtrace = "0.3"
 

	
 
[dev-dependencies]
 
test-generator = "0.3.0"
 
crossbeam-utils = "0.7.0"
 
lazy_static = "1.4.0"
 

	
 
[lib]
 
crate-type = ["cdylib"]
 

	
 
[features]
 
default = ["ffi"]
 
ffi = [] # no feature dependencies
 
\ No newline at end of file
src/runtime/experimental/api.rs
Show inline comments
 
use super::bits::{usizes_for_bits, BitChunkIter, BitMatrix};
 
use super::vec_storage::VecStorage;
 
use crate::common::*;
 
use crate::runtime::endpoint::EndpointExt;
 
use crate::runtime::endpoint::EndpointInfo;
 
use crate::runtime::endpoint::{Endpoint, Msg, SetupMsg};
 
use crate::runtime::errors::EndpointErr;
 
use crate::runtime::errors::MessengerRecvErr;
 
use crate::runtime::errors::PollDeadlineErr;
 
use crate::runtime::MessengerState;
 
use crate::runtime::Messengerlike;
 
use crate::runtime::ReceivedMsg;
 
use crate::runtime::{ProtocolD, ProtocolS};
 

	
 
use std::net::SocketAddr;
 
use std::sync::Arc;
 

	
 
pub enum Coupling {
 
    Active,
 
    Passive,
 
}
 

	
 
#[derive(Debug)]
 
struct Family {
 
    parent: Option<Port>,
 
    children: HashSet<Port>,
 
}
 

	
 
pub struct Binding {
 
    pub coupling: Coupling,
 
    pub polarity: Polarity,
 
    pub addr: SocketAddr,
 
}
 

	
 
pub struct InPort(Port); // InPort and OutPort are AFFINE (exposed to Rust API)
 
pub struct OutPort(Port);
 
impl From<InPort> for Port {
 
    fn from(x: InPort) -> Self {
 
        x.0
 
    }
 
}
 
impl From<OutPort> for Port {
 
    fn from(x: OutPort) -> Self {
 
        x.0
 
    }
 
}
 

	
 
#[derive(Default, Debug)]
 
struct ChannelIndexStream {
 
    next: u32,
 
}
 
impl ChannelIndexStream {
 
    fn next(&mut self) -> u32 {
 
        self.next += 1;
 
        self.next - 1
 
    }
 
}
 

	
 
enum Connector {
 
    Connecting(Connecting),
 
    Connected(Connected),
 
}
 

	
 
#[derive(Default)]
 
pub struct Connecting {
 
    bindings: Vec<Binding>,
 
}
 
trait Binds<T> {
 
    fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> T;
 
}
 
impl Binds<InPort> for Connecting {
 
    fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> InPort {
 
        self.bindings.push(Binding { coupling, polarity: Polarity::Getter, addr });
 
        InPort(Port(self.bindings.len() - 1))
 
    }
 
}
 
impl Binds<OutPort> for Connecting {
 
    fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> OutPort {
 
        self.bindings.push(Binding { coupling, polarity: Polarity::Putter, addr });
 
        OutPort(Port(self.bindings.len() - 1))
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum ConnectErr {
 
    BindErr(SocketAddr),
 
    NewSocketErr(SocketAddr),
 
    AcceptErr(SocketAddr),
 
    ConnectionShutdown(SocketAddr),
 
    PortKindMismatch(Port, SocketAddr),
 
    EndpointErr(Port, EndpointErr),
 
    PollInitFailed,
 
    PollingFailed,
 
    Timeout,
 
}
 

	
 
#[derive(Debug)]
 
struct Component {
 
    protocol: Arc<ProtocolD>,
 
    port_set: HashSet<Port>,
 
    identifier: Arc<[u8]>,
 
    state: ProtocolS,
 
}
 

	
 
impl From<PollDeadlineErr> for ConnectErr {
 
    fn from(e: PollDeadlineErr) -> Self {
 
        use PollDeadlineErr as P;
 
        match e {
 
            P::PollingFailed => Self::PollingFailed,
 
            P::Timeout => Self::Timeout,
 
        }
 
    }
 
}
 
impl From<MessengerRecvErr> for ConnectErr {
 
    fn from(e: MessengerRecvErr) -> Self {
 
        use MessengerRecvErr as M;
 
        match e {
 
            M::PollingFailed => Self::PollingFailed,
 
            M::EndpointErr(port, err) => Self::EndpointErr(port, err),
 
        }
 
    }
 
}
 
impl Connecting {
 
    fn random_controller_id() -> ControllerId {
 
        type Bytes8 = [u8; std::mem::size_of::<ControllerId>()];
 
        let mut bytes = Bytes8::default();
 
        getrandom::getrandom(&mut bytes).unwrap();
 
        unsafe {
 
            // safe:
 
            // 1. All random bytes give valid Bytes8
 
            // 2. Bytes8 and ControllerId have same valid representations
 
            std::mem::transmute::<Bytes8, ControllerId>(bytes)
 
        }
 
    }
 
    fn test_stream_connectivity(stream: &mut TcpStream) -> bool {
 
        use std::io::Write;
 
        stream.write(&[]).is_ok()
 
    }
 
    fn new_connected(
 
        &self,
 
        controller_id: ControllerId,
 
        timeout: Option<Duration>,
 
    ) -> Result<Connected, ConnectErr> {
 
        use ConnectErr::*;
 

	
 
        ///////////////////////////////////////////////////////
 
        // 1. bindings correspond with ports 0..bindings.len(). For each:
 
        //    - reserve a slot in endpoint_exts.
 
        //    - store the port in `native_ports' set.
 
        let mut endpoint_exts = VecStorage::<EndpointExt>::with_reserved_range(self.bindings.len());
 
        let native_ports = (0..self.bindings.len()).map(Port).collect();
 

	
 
        // 2. create MessengerState structure for polling channels
 
        let edge = PollOpt::edge();
 
        let [ready_r, ready_w] = [Ready::readable(), Ready::writable()];
 
        let mut ms =
 
            MessengerState::with_event_capacity(self.bindings.len()).map_err(|_| PollInitFailed)?;
 

	
 
        // 3. create one TODO task per (port,binding) as a vector with indices in lockstep.
 
        //    we will drain it gradually so we store elements of type Option<Todo> where all are initially Some(_)
 
        enum Todo {
 
            PassiveAccepting { listener: TcpListener, channel_id: ChannelId },
 
            ActiveConnecting { stream: TcpStream },
 
            PassiveConnecting { stream: TcpStream, channel_id: ChannelId },
 
            ActiveRecving { endpoint: Endpoint },
 
        }
 
        let mut channel_index_stream = ChannelIndexStream::default();
 
        let mut todos = self
 
            .bindings
 
            .iter()
 
            .enumerate()
 
            .map(|(index, binding)| {
 
                Ok(Some(match binding.coupling {
 
                    Coupling::Passive => {
 
                        let channel_index = channel_index_stream.next();
 
                        let channel_id = ChannelId { controller_id, channel_index };
 
                        let listener =
 
                            TcpListener::bind(&binding.addr).map_err(|_| BindErr(binding.addr))?;
 
                        ms.poll.register(&listener, Token(index), ready_r, edge).unwrap(); // registration unique
 
                        Todo::PassiveAccepting { listener, channel_id }
 
                    }
 
                    Coupling::Active => {
 
                        let stream = TcpStream::connect(&binding.addr)
 
                            .map_err(|_| NewSocketErr(binding.addr))?;
 
                        ms.poll.register(&stream, Token(index), ready_w, edge).unwrap(); // registration unique
 
                        Todo::ActiveConnecting { stream }
 
                    }
 
                }))
 
            })
 
            .collect::<Result<Vec<Option<Todo>>, ConnectErr>>()?;
 
        let mut num_todos_remaining = todos.len();
 

	
 
        // 4. handle incoming events until all TODOs are completed OR we timeout
 
@@ -221,447 +222,467 @@ impl Connecting {
 
                            backoff_millis = ((backoff_millis as f32) * 1.2) as u64 + 3;
 
                            let stream = TcpStream::connect(&binding.addr).unwrap();
 
                            ms.poll.register(&stream, token, ready_w, edge).expect("PAC 3");
 
                            Todo::ActiveConnecting { stream }
 
                        };
 
                        todos[index] = Some(todo);
 
                    }
 
                    Some(Todo::PassiveConnecting { mut stream, channel_id }) => {
 
                        if !Self::test_stream_connectivity(&mut stream) {
 
                            return Err(ConnectionShutdown(binding.addr));
 
                        }
 
                        ms.poll.reregister(&stream, token, ready_r, edge).expect("55");
 
                        let polarity = binding.polarity;
 
                        let info = EndpointInfo { polarity, channel_id };
 
                        let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info });
 
                        let mut endpoint = Endpoint::from_fresh_stream(stream);
 
                        endpoint.send(msg).map_err(|e| EndpointErr(Port(index), e))?;
 
                        let endpoint_ext = EndpointExt { endpoint, info };
 
                        endpoint_exts.occupy_reserved(index, endpoint_ext);
 
                        num_todos_remaining -= 1;
 
                    }
 
                    Some(Todo::ActiveRecving { mut endpoint }) => {
 
                        let ekey = Port(index);
 
                        'recv_loop: while let Some(msg) =
 
                            endpoint.recv().map_err(|e| EndpointErr(ekey, e))?
 
                        {
 
                            if let Msg::SetupMsg(SetupMsg::ChannelSetup { info }) = msg {
 
                                if info.polarity == binding.polarity {
 
                                    return Err(PortKindMismatch(ekey, binding.addr));
 
                                }
 
                                let channel_id = info.channel_id;
 
                                let info = EndpointInfo { polarity: binding.polarity, channel_id };
 
                                ms.polled_undrained.insert(ekey);
 
                                let endpoint_ext = EndpointExt { endpoint, info };
 
                                endpoint_exts.occupy_reserved(index, endpoint_ext);
 
                                num_todos_remaining -= 1;
 
                                break 'recv_loop;
 
                            } else {
 
                                ms.delayed.push(ReceivedMsg { recipient: ekey, msg });
 
                            }
 
                        }
 
                    }
 
                }
 
            }
 
        }
 
        assert_eq!(None, endpoint_exts.iter_reserved().next());
 
        drop(todos);
 

	
 
        ///////////////////////////////////////////////////////
 
        // 1. construct `family', i.e. perform the sink tree setup procedure
 
        use {Msg::SetupMsg as S, SetupMsg::*};
 
        let mut messenger = (&mut ms, &mut endpoint_exts);
 
        impl Messengerlike for (&mut MessengerState, &mut VecStorage<EndpointExt>) {
 
            fn get_state_mut(&mut self) -> &mut MessengerState {
 
                self.0
 
            }
 
            fn get_endpoint_mut(&mut self, ekey: Key) -> &mut Endpoint {
 
                &mut self
 
                    .1
 
                    .get_occupied_mut(ekey.to_raw() as usize)
 
                    .expect("OUT OF BOUNDS")
 
                    .endpoint
 
            }
 
        }
 

	
 
        // 1. broadcast my ID as the first echo. await reply from all in net_keylist
 
        let neighbors = (0..self.bindings.len()).map(Port);
 
        let echo = S(LeaderEcho { maybe_leader: controller_id });
 
        let mut awaiting = IndexSet::<Port>::with_capacity(neighbors.len());
 
        for n in neighbors.clone() {
 
            messenger.send(n, echo.clone()).map_err(|e| EndpointErr(n, e))?;
 
            awaiting.insert(n);
 
        }
 

	
 
        // 2. Receive incoming replies. whenever a higher-id echo arrives,
 
        //    adopt it as leader, sender as parent, and reset the await set.
 
        let mut parent: Option<Port> = None;
 
        let mut my_leader = controller_id;
 
        messenger.undelay_all();
 
        'echo_loop: while !awaiting.is_empty() || parent.is_some() {
 
            let ReceivedMsg { recipient, msg } = messenger.recv_until(deadline)?.ok_or(Timeout)?;
 
            match msg {
 
                S(LeaderAnnounce { leader }) => {
 
                    // someone else completed the echo and became leader first!
 
                    // the sender is my parent
 
                    parent = Some(recipient);
 
                    my_leader = leader;
 
                    awaiting.clear();
 
                    break 'echo_loop;
 
                }
 
                S(LeaderEcho { maybe_leader }) => {
 
                    use Ordering::*;
 
                    match maybe_leader.cmp(&my_leader) {
 
                        Less => { /* ignore */ }
 
                        Equal => {
 
                            awaiting.remove(&recipient);
 
                            if awaiting.is_empty() {
 
                                if let Some(p) = parent {
 
                                    // return the echo to my parent
 
                                    messenger
 
                                        .send(p, S(LeaderEcho { maybe_leader }))
 
                                        .map_err(|e| EndpointErr(p, e))?;
 
                                } else {
 
                                    // DECIDE!
 
                                    break 'echo_loop;
 
                                }
 
                            }
 
                        }
 
                        Greater => {
 
                            // join new echo
 
                            parent = Some(recipient);
 
                            my_leader = maybe_leader;
 
                            let echo = S(LeaderEcho { maybe_leader: my_leader });
 
                            awaiting.clear();
 
                            if neighbors.len() == 1 {
 
                                // immediately reply to parent
 
                                messenger
 
                                    .send(recipient, echo.clone())
 
                                    .map_err(|e| EndpointErr(recipient, e))?;
 
                            } else {
 
                                for n in neighbors.clone() {
 
                                    if n != recipient {
 
                                        messenger
 
                                            .send(n, echo.clone())
 
                                            .map_err(|e| EndpointErr(n, e))?;
 
                                        awaiting.insert(n);
 
                                    }
 
                                }
 
                            }
 
                        }
 
                    }
 
                }
 
                msg => messenger.delay(ReceivedMsg { recipient, msg }),
 
            }
 
        }
 
        match parent {
 
            None => assert_eq!(
 
                my_leader, controller_id,
 
                "I've got no parent, but I consider {:?} the leader?",
 
                my_leader
 
            ),
 
            Some(parent) => assert_ne!(
 
                my_leader, controller_id,
 
                "I have {:?} as parent, but I consider myself ({:?}) the leader?",
 
                parent, controller_id
 
            ),
 
        }
 

	
 
        // 3. broadcast leader announcement (except to parent: confirm they are your parent)
 
        //    in this loop, every node sends 1 message to each neighbor
 
        let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader });
 
        for n in neighbors.clone() {
 
            let msg =
 
                if Some(n) == parent { S(YouAreMyParent) } else { msg_for_non_parents.clone() };
 
            messenger.send(n, msg).map_err(|e| EndpointErr(n, e))?;
 
        }
 

	
 
        // await 1 message from all non-parents
 
        for n in neighbors.clone() {
 
            if Some(n) != parent {
 
                awaiting.insert(n);
 
            }
 
        }
 
        let mut children = HashSet::default();
 
        messenger.undelay_all();
 
        while !awaiting.is_empty() {
 
            let ReceivedMsg { recipient, msg } = messenger.recv_until(deadline)?.ok_or(Timeout)?;
 
            let recipient = recipient;
 
            match msg {
 
                S(YouAreMyParent) => {
 
                    assert!(awaiting.remove(&recipient));
 
                    children.insert(recipient);
 
                }
 
                S(SetupMsg::LeaderAnnounce { leader }) => {
 
                    assert!(awaiting.remove(&recipient));
 
                    assert!(leader == my_leader);
 
                    assert!(Some(recipient) != parent);
 
                    // they wouldn't send me this if they considered me their parent
 
                }
 
                _ => messenger.delay(ReceivedMsg { recipient, msg }),
 
            }
 
        }
 
        let family = Family { parent, children };
 

	
 
        // done!
 
        Ok(Connected {
 
            components: Default::default(),
 
            controller_id,
 
            channel_index_stream,
 
            endpoint_exts,
 
            native_ports,
 
            family,
 
            ephemeral: Default::default(),
 
        })
 
    }
 
    /////////
 
    pub fn connect_using_id(
 
        &mut self,
 
        controller_id: ControllerId,
 
        timeout: Option<Duration>,
 
    ) -> Result<Connected, ConnectErr> {
 
        // 1. try and create a connection from these bindings with self immutable.
 
        let connected = self.new_connected(controller_id, timeout)?;
 
        // 2. success! drain self and return
 
        self.bindings.clear();
 
        Ok(connected)
 
    }
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<Connected, ConnectErr> {
 
        self.connect_using_id(Self::random_controller_id(), timeout)
 
    }
 
}
 

	
 
#[derive(Debug)]
 
pub struct Connected {
 
    native_ports: HashSet<Port>,
 
    controller_id: ControllerId,
 
    channel_index_stream: ChannelIndexStream,
 
    endpoint_exts: VecStorage<EndpointExt>,
 
    components: VecStorage<Component>,
 
    family: Family,
 
    ephemeral: Ephemeral,
 
}
 
#[derive(Debug, Default)]
 
struct Ephemeral {
 
    bit_matrix: BitMatrix,
 
}
 
impl Connected {
 
    pub fn new_component(
 
        &mut self,
 
        protocol: &Arc<ProtocolD>,
 
        identifier: &Arc<[u8]>,
 
        moved_port_list: &[Port],
 
    ) -> Result<(), MainComponentErr> {
 
        //////////////////////////////////////////
 
        // 1. try and create a new component (without mutating self)
 
        use MainComponentErr::*;
 
        let moved_port_set = {
 
            let mut set: HashSet<Port> = Default::default();
 
            for &port in moved_port_list.iter() {
 
                if !self.native_ports.contains(&port) {
 
                    return Err(CannotMovePort(port));
 
                }
 
                if !set.insert(port) {
 
                    return Err(DuplicateMovedPort(port));
 
                }
 
            }
 
            set
 
        };
 
        // moved_port_set is disjoint to native_ports
 
        let expected_polarities = protocol.component_polarities(identifier)?;
 
        if moved_port_list.len() != expected_polarities.len() {
 
            return Err(WrongNumberOfParamaters { expected: expected_polarities.len() });
 
        }
 
        // correct polarity list
 
        for (param_index, (&port, &expected_polarity)) in
 
            moved_port_list.iter().zip(expected_polarities.iter()).enumerate()
 
        {
 
            let polarity =
 
                self.endpoint_exts.get_occupied(port.0).ok_or(UnknownPort(port))?.info.polarity;
 
            if polarity != expected_polarity {
 
                return Err(WrongPortPolarity { param_index, port });
 
            }
 
        }
 
        let state = protocol.new_main_component(identifier, &moved_port_list);
 
        let component = Component {
 
            port_set: moved_port_set,
 
            protocol: protocol.clone(),
 
            identifier: identifier.clone(),
 
            state,
 
        };
 
        //////////////////////////////
 
        // success! mutate self and return Ok
 
        self.native_ports.retain(|e| !component.port_set.contains(e));
 
        self.components.new_occupied(component);
 
        Ok(())
 
    }
 
    pub fn new_channel(&mut self) -> (OutPort, InPort) {
 
        assert!(self.endpoint_exts.len() <= std::u32::MAX as usize - 2);
 
        let channel_id = ChannelId {
 
            controller_id: self.controller_id,
 
            channel_index: self.channel_index_stream.next(),
 
        };
 
        let [e0, e1] = Endpoint::new_memory_pair();
 
        let kp = self.endpoint_exts.new_occupied(EndpointExt {
 
            info: EndpointInfo { channel_id, polarity: Putter },
 
            endpoint: e0,
 
        });
 
        let kg = self.endpoint_exts.new_occupied(EndpointExt {
 
            info: EndpointInfo { channel_id, polarity: Getter },
 
            endpoint: e1,
 
        });
 
        (OutPort(Port(kp)), InPort(Port(kg)))
 
    }
 
    pub fn sync_set(&mut self, _inbuf: &mut [u8], _ops: &mut [PortOpRs]) -> Result<(), ()> {
 
        Ok(())
 
    }
 
    pub fn sync_subsets(
 
        &mut self,
 
        _inbuf: &mut [u8],
 
        _ops: &mut [PortOpRs],
 
        bit_subsets: &[&[usize]],
 
    ) -> Result<usize, ()> {
 
        for (batch_index, bit_subset) in bit_subsets.iter().enumerate() {
 
            println!("batch_index {:?}", batch_index);
 
            let chunk_iter = bit_subset.iter().copied();
 
            for index in BitChunkIter::new(chunk_iter) {
 
                println!("  index {:?}", index);
 
            }
 
        }
 
        Ok(0)
 
    }
 
}
 

	
 
macro_rules! bitslice {
 
    ($( $num:expr  ),*) => {{
 
        &[0 $( | (1usize << $num)  )*]
 
    }};
 
}
 

	
 
#[test]
 
fn api_new_test() {
 
    let mut c = Connecting::default();
 
    let net_out: OutPort = c.bind(Coupling::Active, "127.0.0.1:8000".parse().unwrap());
 
    let net_in: InPort = c.bind(Coupling::Active, "127.0.0.1:8001".parse().unwrap());
 
    let proto_0 = Arc::new(ProtocolD::parse(b"").unwrap());
 
    let mut c = c.connect(None).unwrap();
 
    let (mem_out, mem_in) = c.new_channel();
 
    let mut inbuf = [0u8; 64];
 
    let identifier: Arc<[u8]> = b"sync".to_vec().into();
 
    c.new_component(&proto_0, &identifier, &[net_in.into(), mem_out.into()]).unwrap();
 
    let mut ops = [
 
        PortOpRs::In { msg_range: None, port: &mem_in },
 
        PortOpRs::Out { msg: b"hey", port: &net_out, optional: false },
 
        PortOpRs::Out { msg: b"hi?", port: &net_out, optional: true },
 
        PortOpRs::Out { msg: b"yo!", port: &net_out, optional: false },
 
    ];
 
    c.sync_set(&mut inbuf, &mut ops).unwrap();
 
    c.sync_subsets(&mut inbuf, &mut ops, &[bitslice! {0,1,2}]).unwrap();
 
}
 

	
 
#[repr(C)]
 
pub struct PortOp {
 
    msgptr: *mut u8, // read if OUT, field written if IN, will point into buf
 
    msglen: usize,   // read if OUT, written if IN, won't exceed buf
 
    port: Port,
 
    optional: bool, // no meaning if
 
}
 

	
 
pub enum PortOpRs<'a> {
 
    In { msg_range: Option<Range<usize>>, port: &'a InPort },
 
    Out { msg: &'a [u8], port: &'a OutPort, optional: bool },
 
}
 

	
 
unsafe fn c_sync_set(
 
    connected: &mut Connected,
 
    inbuflen: usize,
 
    inbufptr: *mut u8,
 
    opslen: usize,
 
    opsptr: *mut PortOp,
 
) -> i32 {
 
    let buf = as_mut_slice(inbuflen, inbufptr);
 
    let ops = as_mut_slice(opslen, opsptr);
 
    let (subset_index, wrote) = sync_inner(connected, buf);
 
    assert_eq!(0, subset_index);
 
    for op in ops {
 
        if let Some(range) = wrote.get(&op.port) {
 
            op.msgptr = inbufptr.add(range.start);
 
            op.msglen = range.end - range.start;
 
        }
 
    }
 
    0
 
}
 

	
 
use super::bits::{usizes_for_bits, BitChunkIter};
 
unsafe fn c_sync_subset(
 
    connected: &mut Connected,
 
    inbuflen: usize,
 
    inbufptr: *mut u8,
 
    opslen: usize,
 
    opsptr: *mut PortOp,
 
    subsetslen: usize,
 
    subsetsptr: *const *const usize,
 
) -> i32 {
 
    let buf: &mut [u8] = as_mut_slice(inbuflen, inbufptr);
 
    let ops: &mut [PortOp] = as_mut_slice(opslen, opsptr);
 
    let subsets: &[*const usize] = as_const_slice(subsetslen, subsetsptr);
 
    let subsetlen = usizes_for_bits(opslen);
 
    // don't yet know subsetptr; which subset fires unknown!
 

	
 
    let (subset_index, wrote) = sync_inner(connected, buf);
 
    let subsetptr: *const usize = subsets[subset_index];
 
    let subset: &[usize] = as_const_slice(subsetlen, subsetptr);
 

	
 
    for index in BitChunkIter::new(subset.iter().copied()) {
 
        let op = &mut ops[index as usize];
 
        if let Some(range) = wrote.get(&op.port) {
 
            op.msgptr = inbufptr.add(range.start);
 
            op.msglen = range.end - range.start;
 
        }
 
    }
 
    subset_index as i32
 
}
 

	
 
// dummy fn for the actual synchronous round
 
fn sync_inner<'c, 'b>(
 
    _connected: &'c mut Connected,
 
    _buf: &'b mut [u8],
 
) -> (usize, &'b HashMap<Port, Range<usize>>) {
 
    todo!()
 
}
 

	
 
unsafe fn as_mut_slice<'a, T>(len: usize, ptr: *mut T) -> &'a mut [T] {
 
    std::slice::from_raw_parts_mut(ptr, len)
 
}
 
unsafe fn as_const_slice<'a, T>(len: usize, ptr: *const T) -> &'a [T] {
 
    std::slice::from_raw_parts(ptr, len)
 
}
 

	
 
#[test]
 
fn api_connecting() {
 
    let addrs: [SocketAddr; 3] = [
 
        "127.0.0.1:8888".parse().unwrap(),
 
        "127.0.0.1:8889".parse().unwrap(),
 
        "127.0.0.1:8890".parse().unwrap(),
 
    ];
 

	
 
    lazy_static::lazy_static! {
 
        static ref PROTOCOL: Arc<ProtocolD> = {
 
            static PDL: &[u8] = b"
 
            primitive sync(in i, out o) {
 
                while(true) synchronous {
 
                    put(o, get(i));
 
                }
 
            }
 
            ";
 
            Arc::new(ProtocolD::parse(PDL).unwrap())
 
        };
 
    }
 

	
 
    const TIMEOUT: Option<Duration> = Some(Duration::from_secs(1));
 
    let handles = vec![
 
        std::thread::spawn(move || {
 
            let mut connecting = Connecting::default();
 
            let _a: OutPort = connecting.bind(Coupling::Passive, addrs[0]);
 
            let _b: OutPort = connecting.bind(Coupling::Active, addrs[1]);
 
            let connected = connecting.connect(TIMEOUT);
 
            println!("A: {:#?}", connected);
 
            let p_in: InPort = connecting.bind(Coupling::Passive, addrs[0]);
 
            let p_out: OutPort = connecting.bind(Coupling::Active, addrs[1]);
 
            let mut connected = connecting.connect(TIMEOUT).unwrap();
 
            let identifier = b"sync".to_vec().into();
 
            println!("connected {:#?}", &connected);
 
            connected.new_component(&PROTOCOL, &identifier, &[p_in.into(), p_out.into()]).unwrap();
 
            println!("connected {:#?}", &connected);
 
        }),
 
        std::thread::spawn(move || {
 
            let mut connecting = Connecting::default();
 
            let _a: InPort = connecting.bind(Coupling::Active, addrs[0]);
 
            let _a: OutPort = connecting.bind(Coupling::Active, addrs[0]);
 
            let _b: InPort = connecting.bind(Coupling::Passive, addrs[1]);
 
            let _c: InPort = connecting.bind(Coupling::Active, addrs[2]);
 
            let connected = connecting.connect(TIMEOUT);
 
            println!("B: {:#?}", connected);
 
            let _connected = connecting.connect(TIMEOUT).unwrap();
 
        }),
 
        std::thread::spawn(move || {
 
            let mut connecting = Connecting::default();
 
            let _a: OutPort = connecting.bind(Coupling::Passive, addrs[2]);
 
            let connected = connecting.connect(TIMEOUT);
 
            println!("C: {:#?}", connected);
 
            let _connected = connecting.connect(TIMEOUT).unwrap();
 
        }),
 
    ];
 
    for h in handles {
 
        h.join().unwrap();
 
    }
 
}
src/runtime/experimental/bits.rs
Show inline comments
 
use crate::common::*;
 
use std::alloc::Layout;
 

	
 
/// Given an iterator over BitChunk Items, iterates over the indices (each represented as a u32) for which the bit is SET,
 
/// treating the bits in the BitChunk as a contiguous array.
 
/// e.g. input [0b111000, 0b11] gives output [3, 4, 5, 32, 33].
 
/// observe that the bits per chunk are ordered from least to most significant bits, yielding smaller to larger usizes.
 
/// assumes chunk_iter will yield no more than std::u32::MAX / 32 chunks
 

	
 
pub const fn usize_bytes() -> usize {
 
    std::mem::size_of::<usize>()
 
}
 
pub const fn usize_bits() -> usize {
 
    usize_bytes() * 8
 
}
 
pub const fn usizes_for_bits(bits: usize) -> usize {
 
    (bits + (usize_bits() - 1)) / usize_bits()
 
}
 

	
 
type Chunk = usize;
 
type BitIndex = usize;
 
pub(crate) struct BitChunkIter<I: Iterator<Item = Chunk>> {
 
    cached: usize,
 
    chunk_iter: I,
 
    next_bit_index: BitIndex,
 
}
 
impl<I: Iterator<Item = Chunk>> BitChunkIter<I> {
 
    pub fn new(chunk_iter: I) -> Self {
 
        // first chunk is always a dummy zero, as if chunk_iter yielded Some(FALSE).
 
        // Consequences:
 
        // 1. our next_bit_index is always off by usize_bits() (we correct for it in Self::next) (no additional overhead)
 
        // 2. we cache Chunk and not Option<Chunk>, because chunk_iter.next() is only called in Self::next.
 
        Self { chunk_iter, next_bit_index: 0, cached: 0 }
 
    }
 
}
 
impl<I: Iterator<Item = Chunk>> Iterator for BitChunkIter<I> {
 
    type Item = BitIndex;
 
    fn next(&mut self) -> Option<Self::Item> {
 
        let mut chunk = self.cached;
 

	
 
        // loop until either:
 
        // 1. there are no more Items to return, or
 
        // 2. chunk encodes 1+ Items, one of which we will return.
 
        while chunk == 0 {
 
            // chunk has no bits set! get the next one...
 
            chunk = self.chunk_iter.next()?;
 

	
 
            // ... and jump self.next_bit_index to the next multiple of usize_bits().
 
            self.next_bit_index = (self.next_bit_index + usize_bits()) & !(usize_bits() - 1);
 
        }
 
        // there exists 1+ set bits in chunk
 
        // assert(chunk > 0);
 

	
 
        // Until the least significant bit of chunk is 1:
 
        // 1. shift chunk to the right,
 
        // 2. and increment self.next_bit_index accordingly
 
        // effectively performs a little binary search, shifting 32, then 16, ...
 
        // TODO perhaps there is a more efficient SIMD op for this?
 
        const N_INIT: BitIndex = usize_bits() / 2;
 
        let mut n = N_INIT;
 
        while n >= 1 {
 
            // n is [32,16,8,4,2,1] on 64-bit machine
 
            // this loop is unrolled with release optimizations
 
            let n_least_significant_mask = (1 << n) - 1;
 
            if chunk & n_least_significant_mask == 0 {
 
                // no 1 set within 0..n least significant bits.
 
                self.next_bit_index += n;
 
                chunk >>= n;
 
            }
 
            n /= 2;
 
        }
 
        // least significant bit of chunk is 1. Item to return is known.
 
        // assert(chunk & 1 == 1)
 

	
 
        // prepare our state for the next time Self::next is called.
 
        // Overwrite self.cached such that its shifted state is retained,
 
        // and jump over the bit whose index we are about to return.
 
        self.next_bit_index += 1;
 
        self.cached = chunk >> 1;
 

	
 
        // returned index is usize_bits() smaller than self.next_bit_index because we use an
 
        // off-by-usize_bits() encoding to avoid having to cache an Option<usize>.
 
        Some(self.next_bit_index - 1 - usize_bits())
 
    }
 
}
 

	
 
/*  --properties-->
 
     ___ ___ ___ ___
 
    |___|___|___|___|
 
  | |___|___|___|___|
 
  | |___|___|___|___|
 
  | |___|___|___|___|
 
  |
 
  V
 
 entity chunks (groups of size usize_bits())
 
*/
 

	
 
// TODO newtypes Entity and Property
 

	
 
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
 
struct Pair {
 
    entity: u32,
 
    property: u32,
 
}
 
impl From<[u32; 2]> for Pair {
 
    fn from([entity, property]: [u32; 2]) -> Self {
 
        Pair { entity, property }
 
    }
 
}
 
struct BitMatrix {
 
impl Default for BitMatrix {
 
    fn default() -> Self {
 
        Self::new(Pair { entity: 0, property: 0 })
 
    }
 
}
 
pub struct BitMatrix {
 
    buffer: *mut usize,
 
    bounds: Pair,
 
    layout: Layout, // layout of the currently-allocated buffer
 
}
 
impl Drop for BitMatrix {
 
    fn drop(&mut self) {
 
        unsafe {
 
            // ?
 
            std::alloc::dealloc(self.buffer as *mut u8, self.layout);
 
        }
 
    }
 
}
 
impl Debug for BitMatrix {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        let row_chunks = Self::row_chunks(self.bounds.property as usize);
 
        let column_chunks = Self::column_chunks(self.bounds.entity as usize);
 
        for property in 0..row_chunks {
 
            for entity_chunk in 0..column_chunks {
 
        struct FmtRow<'a> {
 
            me: &'a BitMatrix,
 
            property: usize,
 
        };
 
        impl Debug for FmtRow<'_> {
 
            fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
                let row_chunks = BitMatrix::row_chunks(self.me.bounds.property as usize);
 
                let column_chunks = BitMatrix::column_chunks(self.me.bounds.entity as usize);
 
                write!(f, "|")?;
 
                let mut chunk = unsafe { *self.buffer.add(row_chunks * entity_chunk + property) };
 
                let end = if entity_chunk + 1 == column_chunks {
 
                    self.bounds.entity % usize_bits() as u32
 
                } else {
 
                    usize_bits() as u32
 
                };
 
                for _ in 0..end {
 
                    let c = match chunk & 1 {
 
                        0 => '0',
 
                        _ => '1',
 
                for entity_chunk in 0..column_chunks {
 
                    let mut chunk =
 
                        unsafe { *self.me.buffer.add(row_chunks * entity_chunk + self.property) };
 
                    let end = if entity_chunk + 1 == column_chunks {
 
                        self.me.bounds.entity % usize_bits() as u32
 
                    } else {
 
                        usize_bits() as u32
 
                    };
 
                    write!(f, "{}", c)?;
 
                    chunk >>= 1;
 
                    for _ in 0..end {
 
                        let c = match chunk & 1 {
 
                            0 => '0',
 
                            _ => '1',
 
                        };
 
                        write!(f, "{}", c)?;
 
                        chunk >>= 1;
 
                    }
 
                    write!(f, "_")?;
 
                }
 
                Ok(())
 
            }
 
            write!(f, "|\n")?;
 
        }
 
        Ok(())
 
        let row_chunks = BitMatrix::row_chunks(self.bounds.property as usize);
 
        let iter = (0..row_chunks).map(move |property| FmtRow { me: self, property });
 
        f.debug_list().entries(iter).finish()
 
    }
 
}
 
impl BitMatrix {
 
    #[inline]
 
    const fn row_of(entity: usize) -> usize {
 
        entity / usize_bits()
 
    }
 
    #[inline]
 
    const fn row_chunks(property_bound: usize) -> usize {
 
        property_bound
 
    }
 
    #[inline]
 
    const fn column_chunks(entity_bound: usize) -> usize {
 
        usizes_for_bits(entity_bound + 1)
 
    }
 
    #[inline]
 
    fn offsets_unchecked(&self, at: Pair) -> [usize; 2] {
 
        let o_in = at.entity as usize % usize_bits();
 
        let row = Self::row_of(at.entity as usize);
 
        let row_chunks = self.bounds.property as usize;
 
        let o_of = row * row_chunks + at.property as usize;
 
        [o_of, o_in]
 
    }
 
    // returns a u32 which has bits 000...000111...111
 
    // for the last JAGGED chunk given the column size
 
    // if the last chunk is not jagged (when entity_bound % 32 == 0)
 
    // None is returned,
 
    // otherwise Some(x) is returned such that x & chunk would mask out
 
    // the bits NOT in 0..entity_bound
 
    fn last_row_chunk_mask(entity_bound: u32) -> Option<usize> {
 
        let zero_prefix_len = entity_bound as usize % usize_bits();
 
        if zero_prefix_len == 0 {
 
            None
 
        } else {
 
            Some(!0 >> (usize_bits() - zero_prefix_len))
 
        }
 
    }
 
    fn assert_within_bounds(&self, at: Pair) {
 
        assert!(at.entity < self.bounds.entity);
 
        assert!(at.property < self.bounds.property);
 
    }
 

	
 
    fn layout_for(total_chunks: usize) -> std::alloc::Layout {
 
        unsafe {
 
            // this layout is ALWAYS valid:
 
            // 1. size is always nonzero
 
            // 2. size is always a multiple of 4 and 4-aligned
 
            Layout::from_size_align_unchecked(usize_bytes() * total_chunks.max(1), usize_bytes())
 
        }
 
    }
 
    /////////
 

	
 
    fn reshape(&mut self, bounds: Pair) {
 
        todo!()
 
    }
 

	
 
    fn new(bounds: Pair) -> Self {
 
        let total_chunks = Self::row_chunks(bounds.property as usize)
 
            * Self::column_chunks(bounds.entity as usize);
 
        let layout = Self::layout_for(total_chunks);
 
        let buffer;
 
        unsafe {
 
            buffer = std::alloc::alloc(layout) as *mut usize;
 
            buffer.write_bytes(0u8, total_chunks);
 
        };
 
        Self { buffer, bounds, layout }
 
    }
 
    fn set(&mut self, at: Pair) {
 
        self.assert_within_bounds(at);
 
        let [o_of, o_in] = self.offsets_unchecked(at);
 
        unsafe { *self.buffer.add(o_of) |= 1 << o_in };
 
    }
 
    fn unset(&mut self, at: Pair) {
 
        self.assert_within_bounds(at);
 
        let [o_of, o_in] = self.offsets_unchecked(at);
 
        unsafe { *self.buffer.add(o_of) &= !(1 << o_in) };
 
    }
 
    fn test(&self, at: Pair) -> bool {
 
        self.assert_within_bounds(at);
 
        let [o_of, o_in] = self.offsets_unchecked(at);
 
        unsafe { *self.buffer.add(o_of) & 1 << o_in != 0 }
 
    }
 

	
 
    fn batch_mut<'a, 'b>(&mut self, mut chunk_mut_fn: impl FnMut(&'b mut [BitChunk])) {
 
        let row_chunks = Self::row_chunks(self.bounds.property as usize);
 
        let column_chunks = Self::column_chunks(self.bounds.entity as usize);
 
        let mut ptr = self.buffer;
 
        for _row in 0..column_chunks {
 
            let slice;
 
            unsafe {
 
                let slicey = std::slice::from_raw_parts_mut(ptr, row_chunks);
 
                slice = std::mem::transmute(slicey);
 
                ptr = ptr.add(row_chunks);
 
            }
 
            chunk_mut_fn(slice);
 
        }
 
        if let Some(mask) = Self::last_row_chunk_mask(self.bounds.entity) {
 
            // TODO TEST
 
            let mut ptr = unsafe { self.buffer.add((column_chunks - 1) * row_chunks) };
 
            for _ in 0..row_chunks {
 
                unsafe {
 
                    *ptr &= mask;
 
                    ptr = ptr.add(1);
 
                }
 
            }
 
        }
 
    }
 

	
 
    /// given:
 
    /// 1. a buffer to work with
 
    /// 2. a _fold function_ for combining the properties of a given entity
 
    ///    and returning a new derived property (working )
 
    fn iter_entities_where<'a, 'b>(
 
        &'a self,
 
        buf: &'b mut Vec<usize>,
 
        mut fold_fn: impl FnMut(&'b [BitChunk]) -> BitChunk,
 
    ) -> impl Iterator<Item = u32> + 'b {
 
        let buf_start = buf.len();
 
        let row_chunks = Self::row_chunks(self.bounds.property as usize);
 
        let column_chunks = Self::column_chunks(self.bounds.entity as usize);
 
        let mut ptr = self.buffer;
 
        for _row in 0..column_chunks {
 
            let slice;
 
            unsafe {
 
                let slicey = std::slice::from_raw_parts(ptr, row_chunks);
 
                slice = std::mem::transmute(slicey);
 
                ptr = ptr.add(row_chunks);
 
            }
 
            let chunk = fold_fn(slice);
 
            buf.push(chunk.0);
 
        }
 
        if let Some(mask) = Self::last_row_chunk_mask(self.bounds.entity) {
 
            *buf.iter_mut().last().unwrap() &= mask;
 
        }
 
        BitChunkIter::new(buf.drain(buf_start..)).map(|x| x as u32)
 
    }
 
}
 

	
 
use derive_more::*;
 
#[derive(
 
    Debug, Copy, Clone, BitAnd, Not, BitOr, BitXor, BitAndAssign, BitOrAssign, BitXorAssign,
 
)]
 
#[repr(transparent)]
 
pub struct BitChunk(usize);
 
impl BitChunk {
 
    const fn any(self) -> bool {
 
        self.0 != FALSE.0
 
    }
 
    const fn all(self) -> bool {
 
        self.0 == TRUE.0
 
    }
 
}
 
const TRUE: BitChunk = BitChunk(!0);
 
const FALSE: BitChunk = BitChunk(0);
 

	
 
#[test]
 
fn matrix_test() {
 
    let mut m = BitMatrix::new(Pair { entity: 70, property: 3 });
 
    m.set([2, 0].into());
 
    m.set([40, 1].into());
 
    m.set([40, 2].into());
 
    m.set([40, 0].into());
 
    println!("{:?}", &m);
 

	
 
    m.batch_mut(|p| p[0] = TRUE);
 
    println!("{:?}", &m);
 

	
 
    for i in (0..40).step_by(7) {
 
        m.unset([i, 0].into());
 
    }
 
    m.unset([62, 0].into());
 
    println!("{:?}", &m);
 

	
 
    m.batch_mut(move |p| p[1] = p[0] ^ TRUE);
 
    println!("{:?}", &m);
 

	
 
    let mut buf = vec![];
 
    for index in m.iter_entities_where(&mut buf, move |p| p[1]) {
 
        println!("index {}", index);
 
    }
 
}
src/runtime/experimental/vec_storage.rs
Show inline comments
 
use super::bits::{usize_bits, BitChunkIter};
 
use crate::common::*;
 
use core::mem::MaybeUninit;
 

	
 
#[derive(Default)]
 
struct Bitvec(Vec<usize>);
 
impl Bitvec {
 
    #[inline(always)]
 
    fn offsets_of(i: usize) -> [usize; 2] {
 
        [i / usize_bits(), i % usize_bits()]
 
    }
 
    // assumes read will not go out of bounds
 
    unsafe fn insert(&mut self, i: usize) {
 
        let [o_of, o_in] = Self::offsets_of(i);
 
        let chunk = self.0.get_unchecked_mut(o_of);
 
        *chunk |= 1 << o_in;
 
    }
 
    // assumes read will not go out of bounds
 
    unsafe fn remove(&mut self, i: usize) -> bool {
 
        let [o_of, o_in] = Self::offsets_of(i);
 
        let chunk = self.0.get_unchecked_mut(o_of);
 
        let singleton_mask = 1 << o_in;
 
        let was = (*chunk & singleton_mask) != 0;
 
        *chunk &= !singleton_mask;
 
        was
 
    }
 
    // assumes read will not go out of bounds
 
    #[inline]
 
    unsafe fn contains(&self, i: usize) -> bool {
 
        let [o_of, o_in] = Self::offsets_of(i);
 
        (*self.0.get_unchecked(o_of) & (1 << o_in)) != 0
 
    }
 
    fn pop_first(&mut self) -> Option<usize> {
 
        let i = self.first()?;
 
        unsafe { self.remove(i) };
 
        Some(i)
 
    }
 
    fn iter(&self) -> impl Iterator<Item = usize> + '_ {
 
        BitChunkIter::new(self.0.iter().copied()).map(|x| x as usize)
 
    }
 
    fn first(&self) -> Option<usize> {
 
        self.iter().next()
 
    }
 
}
 

	
 
// A T-type arena which:
 
// 1. does not check for the ABA problem
 
// 2. imposes the object keys on the user
 
// 3. allows the reservation of a space (getting the key) to precede the value being provided.
 
// 4. checks for user error
 
//
 
// Data contains values in one of three states:
 
// 1. occupied: ininitialized. will be dropped.
 
// 2. vacant: uninitialized. may be reused implicitly. won't be dropped.
 
// 2. reserved: uninitialized. may be occupied implicitly. won't be dropped.
 
//
 
// element access is O(1)
 
// removal is O(1) amortized.
 
// insertion is O(N) with a small constant factor,
 
//    doing at worst one linear probe through N/(word_size) contiguous words
 
//
 
// invariant A: data elements are inititalized <=> occupied bit is set
 
// invariant B: occupied and vacant have an empty intersection
 
// invariant C: (vacant U occupied) subset of (0..data.len)
 
// invariant D: last element of data is not in VACANT state
 
// invariant E: number of allocated bits in vacant and occupied >= data.len()
 
// invariant F: vacant_bit_count == vacant.iter().count()
 
pub struct VecStorage<T> {
 
    data: Vec<MaybeUninit<T>>,
 
    occupied: Bitvec,
 
    vacant: Bitvec,
 
    occupied_bit_count: usize,
 
}
 
impl<T> Default for VecStorage<T> {
 
    fn default() -> Self {
 
        Self {
 
            data: Default::default(),
 
            vacant: Default::default(),
 
            occupied: Default::default(),
 
            occupied_bit_count: 0,
 
        }
 
    }
 
}
 
impl<T: Debug> Debug for VecStorage<T> {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        enum FmtT<'a, T> {
 
            Vacant,
 
            Reserved,
 
            Occupied(&'a T),
 
            Vacant(usize),
 
            Reserved(usize),
 
            Occupied(usize, &'a T),
 
        };
 
        impl<T: Debug> Debug for FmtT<'_, T> {
 
            fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
                match self {
 
                    FmtT::Vacant => write!(f, "Vacant"),
 
                    FmtT::Reserved => write!(f, "Reserved"),
 
                    FmtT::Occupied(t) => write!(f, "Occupied({:?})", t),
 
                    FmtT::Vacant(i) => write!(f, "{} => Vacant", i),
 
                    FmtT::Reserved(i) => write!(f, "{} =>Reserved", i),
 
                    FmtT::Occupied(i, t) => {
 
                        write!(f, "{} => Occupied(", i)?;
 
                        t.fmt(f)?;
 
                        write!(f, ")")
 
                    }
 
                }
 
            }
 
        }
 

	
 
        let iter = (0..self.data.len()).map(|i| unsafe {
 
            // 1. data bounds are checked by construction of i.
 
            // 2. occupied index => valid data is read.
 
            // 3. bitset bounds are ensured by invariant E.
 
            if self.occupied.contains(i) {
 
                FmtT::Occupied(&*self.data.get_unchecked(i).as_ptr())
 
                FmtT::Occupied(i, &*self.data.get_unchecked(i).as_ptr())
 
            } else if self.vacant.contains(i) {
 
                FmtT::Vacant
 
                FmtT::Vacant(i)
 
            } else {
 
                FmtT::Reserved
 
                FmtT::Reserved(i)
 
            }
 
        });
 
        f.debug_list().entries(iter).finish()
 
    }
 
}
 
impl<T> Drop for VecStorage<T> {
 
    fn drop(&mut self) {
 
        self.clear();
 
    }
 
}
 
impl<T> VecStorage<T> {
 
    // ASSUMES that i in 0..self.data.len()
 
    unsafe fn get_occupied_unchecked(&self, i: usize) -> Option<&T> {
 
        if self.occupied.contains(i) {
 
            None
 
        } else {
 
            // 2. Invariant A => reading valid ata
 
            Some(&*self.data.get_unchecked(i).as_ptr())
 
        } else {
 
            None
 
        }
 
    }
 
    //////////////
 
    pub fn len(&self) -> usize {
 
        self.occupied_bit_count
 
    }
 
    pub fn with_reserved_range(range_end: usize) -> Self {
 
        let mut data = Vec::with_capacity(range_end);
 
        unsafe {
 
            // data is uninitialized, as intended
 
            data.set_len(range_end);
 
        }
 
        let bitset_len = (range_end + (usize_bits() - 1)) / usize_bits();
 
        let chunk_iter = std::iter::repeat(0usize).take(bitset_len);
 
        Self {
 
            data,
 
            vacant: Bitvec(chunk_iter.clone().collect()),
 
            occupied: Bitvec(chunk_iter.collect()),
 
            occupied_bit_count: 0,
 
        }
 
    }
 
    pub fn clear(&mut self) {
 
        for i in 0..self.data.len() {
 
            // SAFE: bitvec bounds ensured by invariant E
 
            if unsafe { self.occupied.contains(i) } {
 
                // invariant A: this element is OCCUPIED
 
                unsafe {
 
                    // 1. by construction, i is in bounds
 
                    // 2. i is occupied => initialized data is being dropped
 
                    drop(self.data.get_unchecked_mut(i).as_ptr().read());
 
                }
 
            }
 
        }
 
        self.vacant.0.clear();
 
        self.occupied.0.clear();
 
        self.occupied_bit_count = 0;
 
    }
 
    pub fn iter(&self) -> impl Iterator<Item = &T> {
 
        (0..self.data.len()).filter_map(move |i| unsafe { self.get_occupied_unchecked(i) })
 
    }
 
    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> {
 
        (0..self.data.len()).filter_map(move |i| unsafe {
 
            // SAFE: bitvec bounds ensured by invariant E
 
            if self.occupied.contains(i) {
 
                // Invariant A => reading valid data
 
                Some(&mut *self.data.get_unchecked_mut(i).as_mut_ptr())
 
            } else {
 
                None
 
            }
 
        })
 
    }
 
    pub fn get_occupied(&self, i: usize) -> Option<&T> {
 
        if i >= self.data.len() {
 
            None
 
        } else {
 
            unsafe {
 
                // index is within bounds
 
                self.get_occupied_unchecked(i)
 
            }
 
        }
 
    }
 
    pub fn get_occupied_mut(&mut self, i: usize) -> Option<&mut T> {
 
        // SAFE: bitvec bounds ensured by invariant E
 
        if i < self.data.len() && unsafe { self.occupied.contains(i) } {
 
            unsafe {
 
                // 1. index is within bounds
 
                // 2. Invariant A => reading valid ata
 
                Some(&mut *self.data.get_unchecked_mut(i).as_mut_ptr())
 
            }
 
        } else {
 
            None
 
        }
 
    }
 
    pub fn new_reserved(&mut self) -> usize {
 
        if let Some(i) = self.vacant.pop_first() {
 
            i
 
        } else {
 
            let bitsets_need_another_chunk = self.data.len() % usize_bits() == 0;
 
            // every (usize_bits())th time self.data grows by 1, bitsets grow by usize_bits().
 
            if bitsets_need_another_chunk {
 
                self.vacant.0.push(0usize);
 
                self.occupied.0.push(0usize);
 
            }
 
            self.data.push(MaybeUninit::uninit());
 
            self.data.len() - 1
 
        }
 
    }
 
    pub fn occupy_reserved(&mut self, i: usize, t: T) {
 
        // SAFE: bitvec bounds ensured by invariant E
 
        assert!(i < self.data.len());
 
        // element is within bounds
 
        assert!(unsafe { !self.occupied.contains(i) && !self.vacant.contains(i) });
 
        // element is surely reserved
 
        unsafe {
 
            // 1. invariant C => write is within bounds
 
            // 2. i WAS reserved => no initialized data is being overwritten
 
            self.data.get_unchecked_mut(i).as_mut_ptr().write(t);
 
            self.occupied.insert(i);
 
        };
 
        self.occupied_bit_count += 1;
 
    }
 
    pub fn new_occupied(&mut self, t: T) -> usize {
 
        let i = self.new_reserved();
 
        unsafe {
 
            // 1. invariant C => write is within bounds
 
            // 2. i WAS reserved => no initialized data is being overwritten
 
            self.data.get_unchecked_mut(i).as_mut_ptr().write(t);
 
            self.occupied.insert(i);
 
        };
 
        self.occupied_bit_count += 1;
 
        i
 
    }
 
    pub fn vacate(&mut self, i: usize) -> Option<T> {
 
        // SAFE: bitvec bounds ensured by invariant E
 
        if i >= self.data.len() || unsafe { self.vacant.contains(i) } {
 
            // already vacant. nothing to do here
 
            return None;
 
        }
 
        // i is certainly within bounds of self.data
 
        // SAFE: bitvec bounds ensured by invariant E
 
        let value = if unsafe { self.occupied.remove(i) } {
 
            unsafe {
 
                // 1. index is within bounds
 
                // 2. i is occupied => initialized data is being read
 
                self.occupied_bit_count -= 1;
 
                Some(self.data.get_unchecked_mut(i).as_ptr().read())
 
            }
 
        } else {
 
            // reservations have no data to drop
 
            None
 
        };
 
        // Mark as vacant...
 
        if i + 1 == self.data.len() {
 
            // ... by truncating self.data.
 
            // must truncate to avoid violating invariant D.
 
            // pops at least once:
 
            while let Some(_) = self.data.pop() {
 
                let pop_next = self
 
                    .data
 
                    .len()
 
                    .checked_sub(1)
 
                    .map(|index| unsafe {
 
                        // SAFE: bitvec bounds ensured by invariant E
 
                        self.vacant.remove(index)
 
                    })
 
                    .unwrap_or(false);
 
                if !pop_next {
 
                    break;
 
                }
 
            }
 
        } else {
 
            // ... by populating self.vacant.
 
            // SAFE: bitvec bounds ensured by invariant E
 
            unsafe { self.vacant.insert(i) };
 
        }
 
        value
 
    }
 
    pub fn iter_reserved(&self) -> impl Iterator<Item = usize> + '_ {
 
        BitChunkIter::new(self.occupied.0.iter().zip(self.vacant.0.iter()).map(|(&a, &b)| !(a | b)))
 
            .take_while(move |&x| x < self.data.len())
 
    }
 
}
 

	
 
#[test]
 
fn vec_storage() {
 
    #[derive(Debug)]
 
    struct Foo;
 
    impl Drop for Foo {
 
        fn drop(&mut self) {
 
            println!("DROPPING FOO!");
 
        }
 
    }
 
    let mut v = VecStorage::with_reserved_range(4);
 
    let i0 = v.new_occupied(Foo);
 
    println!("{:?}", &v);
 

	
 
    let i1 = v.new_reserved();
 
    println!("{:?}", &v);
 

	
 
    println!("reserved {:?}", v.iter_reserved().collect::<Vec<_>>());
 

	
 
    println!("q {:?}", v.vacate(i0));
 
    println!("{:?}", &v);
 

	
 
    println!("q {:?}", v.vacate(2));
 
    println!("{:?}", &v);
 

	
 
    println!("q {:?}", v.vacate(1));
 
    println!("{:?}", &v);
 

	
 
    v.occupy_reserved(i1, Foo);
 
    println!("{:?}", &v);
0 comments (0 inline, 0 general)