Changeset - 8ea9f0e9a5ab
[Not reviewed]
0 5 0
Christopher Esterhuyse - 5 years ago 2020-02-21 10:23:17
christopher.esterhuyse@gmail.com
fusing ports and keys
5 files changed with 433 insertions and 77 deletions:
0 comments (0 inline, 0 general)
src/common.rs
Show inline comments
 
@@ -39,13 +39,15 @@ pub struct ChannelId {
 
pub enum Polarity {
 
    Putter, // output port (from the perspective of the component)
 
    Getter, // input port (from the perspective of the component)
 
}
 

	
 
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Copy, Clone, Debug)]
 
pub struct Key(u64);
 
#[repr(C)]
 
pub struct Port(pub usize); // ports are COPY
 
pub type Key = Port;
 

	
 
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
 
pub enum MainComponentErr {
 
    NoSuchComponent,
 
    NonPortTypeParameters,
 
}
 
@@ -102,16 +104,16 @@ pub trait PolyContext {
 
    fn is_firing(&mut self, ekey: Key) -> Option<bool>;
 
    fn read_msg(&mut self, ekey: Key) -> Option<&Payload>;
 
}
 

	
 
///////////////////// IMPL /////////////////////
 
impl Key {
 
    pub fn from_raw(raw: u64) -> Self {
 
    pub fn from_raw(raw: usize) -> Self {
 
        Self(raw)
 
    }
 
    pub fn to_raw(self) -> u64 {
 
    pub fn to_raw(self) -> usize {
 
        self.0
 
    }
 
    pub fn to_token(self) -> mio::Token {
 
        mio::Token(self.0.try_into().unwrap())
 
    }
 
    pub fn from_token(t: mio::Token) -> Self {
src/runtime/experimental/api.rs
Show inline comments
 
use super::vec_storage::VecStorage;
 
use crate::common::*;
 
use crate::runtime::endpoint::Endpoint;
 
use crate::runtime::endpoint::EndpointExt;
 
use crate::runtime::endpoint::EndpointInfo;
 
use crate::runtime::endpoint::{Endpoint, Msg, SetupMsg};
 
use crate::runtime::MessengerState;
 
use crate::runtime::Messengerlike;
 
use crate::runtime::ReceivedMsg;
 

	
 
use std::net::SocketAddr;
 
use std::sync::Arc;
 

	
 
pub enum Polarity {
 
    In,
 
    Out,
 
}
 
pub enum Coupling {
 
    Active,
 
    Passive,
 
}
 

	
 
struct Family {
 
    parent: Option<Port>,
 
    children: HashSet<Port>,
 
}
 

	
 
pub struct Binding {
 
    pub coupling: Coupling,
 
    pub polarity: Polarity,
 
    pub addr: SocketAddr,
 
}
 
impl From<(Coupling, Polarity, SocketAddr)> for Binding {
 
    fn from((coupling, polarity, addr): (Coupling, Polarity, SocketAddr)) -> Self {
 
        Self { coupling, polarity, addr }
 
    }
 
}
 

	
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
 
#[repr(C)]
 
pub struct Port(pub u32);
 
pub struct InPort(Port); // InPort and OutPort are AFFINE (exposed to Rust API)
 
pub struct OutPort(Port);
 
impl From<InPort> for Port {
 
    fn from(x: InPort) -> Self {
 
        x.0
 
    }
 
}
 
impl From<OutPort> for Port {
 
    fn from(x: OutPort) -> Self {
 
        x.0
 
    }
 
}
 
pub struct InPort(Port);
 
pub struct OutPort(Port);
 

	
 
#[derive(Default)]
 
struct ChannelIndexStream {
 
    next: u32,
 
}
 
impl ChannelIndexStream {
 
@@ -56,101 +54,398 @@ enum Connector {
 
    Connecting(Connecting),
 
    Connected(Connected),
 
}
 

	
 
#[derive(Default)]
 
pub struct Connecting {
 
    bindings: Vec<Binding>, // invariant: no more than std::u32::MAX entries
 
    bindings: Vec<Binding>,
 
}
 
trait Binds<T> {
 
    fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> T;
 
}
 
impl Binds<InPort> for Connecting {
 
    fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> InPort {
 
        self.bindings.push((coupling, Polarity::In, addr).into());
 
        let pid: u32 = (self.bindings.len() - 1).try_into().expect("Port ID overflow!");
 
        InPort(Port(pid))
 
        self.bindings.push(Binding { coupling, polarity: Polarity::Getter, addr });
 
        InPort(Port(self.bindings.len() - 1))
 
    }
 
}
 
impl Binds<OutPort> for Connecting {
 
    fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> OutPort {
 
        self.bindings.push((coupling, Polarity::Out, addr).into());
 
        let pid: u32 = (self.bindings.len() - 1).try_into().expect("Port ID overflow!");
 
        OutPort(Port(pid))
 
        self.bindings.push(Binding { coupling, polarity: Polarity::Putter, addr });
 
        OutPort(Port(self.bindings.len() - 1))
 
    }
 
}
 
impl Connecting {
 
    pub fn connect(&mut self, _timeout: Option<Duration>) -> Result<Connected, ()> {
 
        let controller_id = 42;
 
        let channel_index_stream = ChannelIndexStream::default();
 
        let native_ports = (0..self.bindings.len()).map(|x| Port(x as u32)).collect();
 
        self.bindings.clear();
 
    fn random_controller_id() -> ControllerId {
 
        type Bytes8 = [u8; std::mem::size_of::<ControllerId>()];
 
        let mut bytes = Bytes8::default();
 
        getrandom::getrandom(&mut bytes).unwrap();
 
        unsafe {
 
            // safe:
 
            // 1. All random bytes give valid Bytes8
 
            // 2. Bytes8 and ControllerId have same valid representations
 
            std::mem::transmute::<Bytes8, ControllerId>(bytes)
 
        }
 
    }
 
    fn test_stream_connectivity(stream: &mut TcpStream) -> bool {
 
        use std::io::Write;
 
        stream.write(&[]).is_ok()
 
    }
 
    fn new_connected(
 
        &self,
 
        controller_id: ControllerId,
 
        protocol: &Arc<Protocol>,
 
        timeout: Option<Duration>,
 
    ) -> Result<Connected, ()> {
 
        // TEMP: helper functions until Key is unified with Port
 
        #[inline]
 
        fn key2port(ekey: Key) -> Port {
 
            Port(ekey.to_raw() as usize)
 
        }
 
        #[inline]
 
        fn port2key(port: Port) -> Key {
 
            Key::from_raw(port.0)
 
        }
 

	
 
        // 1. bindings correspond with ports 0..bindings.len(). For each:
 
        //    - reserve a slot in endpoint_exts.
 
        //    - store the port in `native_ports' set.
 
        let mut endpoint_exts = VecStorage::<EndpointExt>::with_reserved_range(self.bindings.len());
 
        let native_ports = (0..self.bindings.len()).map(Port).collect();
 

	
 
        // 2. create MessengerState structure for polling channels
 
        let edge = PollOpt::edge();
 
        let [ready_r, ready_w] = [Ready::readable(), Ready::writable()];
 
        let mut ms = MessengerState {
 
            poll: Poll::new().map_err(drop)?,
 
            events: Events::with_capacity(self.bindings.len()),
 
            delayed: vec![],
 
            undelayed: vec![],
 
            polled_undrained: Default::default(),
 
        };
 

	
 
        // 3. create one TODO task per (port,binding) as a vector with indices in lockstep.
 
        //    we will drain it gradually so we store elements of type Option<Todo> where all are initially Some(_)
 
        enum Todo {
 
            PassiveAccepting { listener: TcpListener, channel_id: ChannelId },
 
            ActiveConnecting { stream: TcpStream },
 
            PassiveConnecting { stream: TcpStream, channel_id: ChannelId },
 
            ActiveRecving { endpoint: Endpoint },
 
        }
 
        let mut channel_index_stream = ChannelIndexStream::default();
 
        let mut todos = self
 
            .bindings
 
            .iter()
 
            .enumerate()
 
            .map(|(index, binding)| {
 
                Ok(Some(match binding.coupling {
 
                    Coupling::Passive => {
 
                        let channel_index = channel_index_stream.next();
 
                        let channel_id = ChannelId { controller_id, channel_index };
 
                        let listener = TcpListener::bind(&binding.addr).map_err(drop)?;
 
                        ms.poll.register(&listener, Token(index), ready_r, edge).unwrap(); // registration unique
 
                        Todo::PassiveAccepting { listener, channel_id }
 
                    }
 
                    Coupling::Active => {
 
                        let stream = TcpStream::connect(&binding.addr).map_err(drop)?;
 
                        ms.poll.register(&stream, Token(index), ready_w, edge).unwrap(); // registration unique
 
                        Todo::ActiveConnecting { stream }
 
                    }
 
                }))
 
            })
 
            .collect::<Result<Vec<Option<Todo>>, ()>>()?;
 
        let mut num_todos_remaining = todos.len();
 

	
 
        // 4. handle incoming events until all TODOs are completed OR we timeout
 
        let deadline = timeout.map(|t| Instant::now() + t);
 
        let mut polled_undrained_later = IndexSet::<_>::default();
 
        let mut backoff_millis = 10;
 
        while num_todos_remaining > 0 {
 
            ms.poll_events_until(deadline).map_err(drop)?;
 
            for event in ms.events.iter() {
 
                let token = event.token();
 
                let index = token.0;
 
                let binding = &self.bindings[index];
 
                match todos[index].take() {
 
                    None => {
 
                        polled_undrained_later.insert(index);
 
                    }
 
                    Some(Todo::PassiveAccepting { listener, channel_id }) => {
 
                        let (stream, _peer_addr) = listener.accept().map_err(drop)?;
 
                        ms.poll.deregister(&listener).expect("wer");
 
                        ms.poll.register(&stream, token, ready_w, edge).expect("3y5");
 
                        todos[index] = Some(Todo::PassiveConnecting { stream, channel_id });
 
                    }
 
                    Some(Todo::ActiveConnecting { mut stream }) => {
 
                        let todo = if Self::test_stream_connectivity(&mut stream) {
 
                            ms.poll.reregister(&stream, token, ready_r, edge).expect("52");
 
                            let endpoint = Endpoint::from_fresh_stream(stream);
 
                            Todo::ActiveRecving { endpoint }
 
                        } else {
 
                            ms.poll.deregister(&stream).expect("wt");
 
                            std::thread::sleep(Duration::from_millis(backoff_millis));
 
                            backoff_millis = ((backoff_millis as f32) * 1.2) as u64 + 3;
 
                            let stream = TcpStream::connect(&binding.addr).unwrap();
 
                            ms.poll.register(&stream, token, ready_w, edge).expect("PAC 3");
 
                            Todo::ActiveConnecting { stream }
 
                        };
 
                        todos[index] = Some(todo);
 
                    }
 
                    Some(Todo::PassiveConnecting { mut stream, channel_id }) => {
 
                        if !Self::test_stream_connectivity(&mut stream) {
 
                            return Err(());
 
                        }
 
                        ms.poll.reregister(&stream, token, ready_r, edge).expect("55");
 
                        let polarity = binding.polarity;
 
                        let info = EndpointInfo { polarity, channel_id };
 
                        let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info });
 
                        let mut endpoint = Endpoint::from_fresh_stream(stream);
 
                        endpoint.send(msg).map_err(drop)?;
 
                        let endpoint_ext = EndpointExt { endpoint, info };
 
                        endpoint_exts.occupy_reserved(index, endpoint_ext);
 
                        num_todos_remaining -= 1;
 
                    }
 
                    Some(Todo::ActiveRecving { mut endpoint }) => {
 
                        // log!(logger, "{:03?} start ActiveRecving...", major);
 
                        // assert!(event.readiness().is_readable());
 
                        let ekey = Key::from_raw(index);
 
                        'recv_loop: while let Some(msg) = endpoint.recv().map_err(drop)? {
 
                            if let Msg::SetupMsg(SetupMsg::ChannelSetup { info }) = msg {
 
                                if info.polarity == binding.polarity {
 
                                    return Err(());
 
                                }
 
                                let channel_id = info.channel_id;
 
                                let info = EndpointInfo { polarity: binding.polarity, channel_id };
 
                                ms.polled_undrained.insert(ekey);
 
                                let endpoint_ext = EndpointExt { endpoint, info };
 
                                endpoint_exts.occupy_reserved(index, endpoint_ext);
 
                                num_todos_remaining -= 1;
 
                                break 'recv_loop;
 
                            } else {
 
                                ms.delayed.push(ReceivedMsg { recipient: ekey, msg });
 
                            }
 
                        }
 
                    }
 
                }
 
            }
 
        }
 
        assert_eq!(None, endpoint_exts.iter_reserved().next());
 
        drop(todos);
 

	
 
        // 1. construct `family', i.e. perform the sink tree setup procedure
 

	
 
        use {Msg::SetupMsg as S, SetupMsg::*};
 
        let mut messenger = (&mut ms, &mut endpoint_exts);
 
        impl Messengerlike for (&mut MessengerState, &mut VecStorage<EndpointExt>) {
 
            fn get_state_mut(&mut self) -> &mut MessengerState {
 
                self.0
 
            }
 
            fn get_endpoint_mut(&mut self, ekey: Key) -> &mut Endpoint {
 
                &mut self
 
                    .1
 
                    .get_occupied_mut(ekey.to_raw() as usize)
 
                    .expect("OUT OF BOUNDS")
 
                    .endpoint
 
            }
 
        }
 

	
 
        // 1. broadcast my ID as the first echo. await reply from all in net_keylist
 
        let neighbors = (0..self.bindings.len()).map(Port);
 
        let echo = S(LeaderEcho { maybe_leader: controller_id });
 
        let mut awaiting = IndexSet::<Port>::with_capacity(neighbors.len());
 
        for n in neighbors.clone() {
 
            messenger.send(port2key(n), echo.clone()).map_err(drop)?;
 
            awaiting.insert(n);
 
        }
 

	
 
        // 2. Receive incoming replies. whenever a higher-id echo arrives,
 
        //    adopt it as leader, sender as parent, and reset the await set.
 
        let mut parent: Option<Port> = None;
 
        let mut my_leader = controller_id;
 
        messenger.undelay_all();
 
        'echo_loop: while !awaiting.is_empty() || parent.is_some() {
 
            let ReceivedMsg { recipient, msg } =
 
                messenger.recv_until(deadline).map_err(drop)?.ok_or(())?;
 
            let recipient = key2port(recipient);
 
            match msg {
 
                S(LeaderAnnounce { leader }) => {
 
                    // someone else completed the echo and became leader first!
 
                    // the sender is my parent
 
                    parent = Some(recipient);
 
                    my_leader = leader;
 
                    awaiting.clear();
 
                    break 'echo_loop;
 
                }
 
                S(LeaderEcho { maybe_leader }) => {
 
                    use Ordering::*;
 
                    match maybe_leader.cmp(&my_leader) {
 
                        Less => { /* ignore */ }
 
                        Equal => {
 
                            awaiting.remove(&recipient);
 
                            if awaiting.is_empty() {
 
                                if let Some(p) = parent {
 
                                    // return the echo to my parent
 
                                    messenger
 
                                        .send(port2key(p), S(LeaderEcho { maybe_leader }))
 
                                        .map_err(drop)?;
 
                                } else {
 
                                    // DECIDE!
 
                                    break 'echo_loop;
 
                                }
 
                            }
 
                        }
 
                        Greater => {
 
                            // join new echo
 
                            parent = Some(recipient);
 
                            my_leader = maybe_leader;
 
                            let echo = S(LeaderEcho { maybe_leader: my_leader });
 
                            awaiting.clear();
 
                            if neighbors.len() == 1 {
 
                                // immediately reply to parent
 
                                messenger.send(port2key(recipient), echo.clone()).map_err(drop)?;
 
                            } else {
 
                                for n in neighbors.clone() {
 
                                    if n != recipient {
 
                                        messenger.send(port2key(n), echo.clone()).map_err(drop)?;
 
                                        awaiting.insert(n);
 
                                    }
 
                                }
 
                            }
 
                        }
 
                    }
 
                }
 
                msg => messenger.delay(ReceivedMsg { recipient: port2key(recipient), msg }),
 
            }
 
        }
 
        match parent {
 
            None => assert_eq!(
 
                my_leader, controller_id,
 
                "I've got no parent, but I consider {:?} the leader?",
 
                my_leader
 
            ),
 
            Some(parent) => assert_ne!(
 
                my_leader, controller_id,
 
                "I have {:?} as parent, but I consider myself ({:?}) the leader?",
 
                parent, controller_id
 
            ),
 
        }
 

	
 
        // 3. broadcast leader announcement (except to parent: confirm they are your parent)
 
        //    in this loop, every node sends 1 message to each neighbor
 
        let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader });
 
        for n in neighbors.clone() {
 
            let msg =
 
                if Some(n) == parent { S(YouAreMyParent) } else { msg_for_non_parents.clone() };
 
            messenger.send(port2key(n), msg).map_err(drop)?;
 
        }
 

	
 
        // await 1 message from all non-parents
 
        for n in neighbors.clone() {
 
            if Some(n) != parent {
 
                awaiting.insert(n);
 
            }
 
        }
 
        let mut children = HashSet::default();
 
        messenger.undelay_all();
 
        while !awaiting.is_empty() {
 
            let ReceivedMsg { recipient, msg } =
 
                messenger.recv_until(deadline).map_err(drop)?.ok_or(())?;
 
            let recipient = key2port(recipient);
 
            match msg {
 
                S(YouAreMyParent) => {
 
                    assert!(awaiting.remove(&recipient));
 
                    children.insert(recipient);
 
                }
 
                S(SetupMsg::LeaderAnnounce { leader }) => {
 
                    assert!(awaiting.remove(&recipient));
 
                    assert!(leader == my_leader);
 
                    assert!(Some(recipient) != parent);
 
                    // they wouldn't send me this if they considered me their parent
 
                }
 
                _ => messenger.delay(ReceivedMsg { recipient: port2key(recipient), msg }),
 
            }
 
        }
 
        let family = Family { parent, children };
 

	
 
        // 1. done! return
 
        Ok(Connected {
 
            controller_id,
 
            channel_index_stream,
 
            components: vec![],
 
            endpoint_exts: vec![],
 
            protocol: protocol.clone(),
 
            endpoint_exts,
 
            native_ports,
 
            family,
 
        })
 
    }
 
    /////////
 
    pub fn connect_using_id(
 
        &mut self,
 
        controller_id: ControllerId,
 
        protocol: &Arc<Protocol>,
 
        timeout: Option<Duration>,
 
    ) -> Result<Connected, ()> {
 
        // 1. try and create a connection from these bindings with self immutable.
 
        let connected = self.new_connected(controller_id, protocol, timeout)?;
 
        // 2. success! drain self and return
 
        self.bindings.clear();
 
        Ok(connected)
 
    }
 
    pub fn connect(
 
        &mut self,
 
        protocol: &Arc<Protocol>,
 
        timeout: Option<Duration>,
 
    ) -> Result<Connected, ()> {
 
        self.connect_using_id(Self::random_controller_id(), protocol, timeout)
 
    }
 
}
 
pub struct Protocol;
 
impl Protocol {
 
    pub fn parse(_pdl_text: &[u8]) -> Result<Self, ()> {
 
        Ok(Protocol)
 
    }
 
}
 
struct ComponentExt {
 
    protocol: Arc<Protocol>,
 
    ports: HashSet<Port>,
 
    name: Vec<u8>,
 
}
 
// struct ComponentExt {
 
//     protocol: Arc<Protocol>,
 
//     ports: HashSet<Port>,
 
//     name: Vec<u8>,
 
// }
 
pub struct Connected {
 
    native_ports: HashSet<Port>,
 
    controller_id: ControllerId,
 
    channel_index_stream: ChannelIndexStream,
 
    endpoint_exts: Vec<EndpointExt>, // invaraint
 
    components: Vec<ComponentExt>,
 
    endpoint_exts: VecStorage<EndpointExt>,
 
    protocol: Arc<Protocol>,
 
    family: Family,
 
    // components: Vec<ComponentExt>,
 
}
 
impl Connected {
 
    pub fn new_channel(&mut self) -> (OutPort, InPort) {
 
        assert!(self.endpoint_exts.len() <= std::u32::MAX as usize - 2);
 
        let ports =
 
            [Port(self.endpoint_exts.len() as u32 - 1), Port(self.endpoint_exts.len() as u32)];
 
        let channel_id = ChannelId {
 
            controller_id: self.controller_id,
 
            channel_index: self.channel_index_stream.next(),
 
        };
 
        let [e0, e1] = Endpoint::new_memory_pair();
 
        self.endpoint_exts.push(EndpointExt {
 
        let kp = self.endpoint_exts.new_occupied(EndpointExt {
 
            info: EndpointInfo { channel_id, polarity: Putter },
 
            endpoint: e0,
 
        });
 
        self.endpoint_exts.push(EndpointExt {
 
        let kg = self.endpoint_exts.new_occupied(EndpointExt {
 
            info: EndpointInfo { channel_id, polarity: Getter },
 
            endpoint: e1,
 
        });
 
        for p in ports.iter() {
 
            self.native_ports.insert(Port(p.0));
 
        }
 
        (OutPort(ports[0]), InPort(ports[1]))
 
        (OutPort(Port(kp)), InPort(Port(kg)))
 
    }
 
    pub fn new_component(
 
        &mut self,
 
        protocol: &Arc<Protocol>,
 
        name: Vec<u8>,
 
        moved_ports: &[Port],
 
    ) -> Result<(), ()> {
 
    pub fn new_component(&mut self, _name: Vec<u8>, moved_ports: &[Port]) -> Result<(), ()> {
 
        let moved_ports = moved_ports.iter().copied().collect();
 
        if !self.native_ports.is_superset(&moved_ports) {
 
            return Err(());
 
        }
 
        self.native_ports.retain(|e| !moved_ports.contains(e));
 
        self.components.push(ComponentExt { ports: moved_ports, protocol: protocol.clone(), name });
 
        // TODO add a singleton machine
 
        Ok(())
 
        // self.components.push(ComponentExt { ports: moved_ports, protocol: protocol.clone(), name });
 
        todo!()
 
    }
 
    pub fn sync_set(&mut self, _inbuf: &mut [u8], _ops: &mut [PortOpRs]) -> Result<(), ()> {
 
        Ok(())
 
    }
 
    pub fn sync_subsets(
 
        &mut self,
 
@@ -178,16 +473,16 @@ macro_rules! bitslice {
 
#[test]
 
fn api_new_test() {
 
    let mut c = Connecting::default();
 
    let net_out: OutPort = c.bind(Coupling::Active, "127.0.0.1:8000".parse().unwrap());
 
    let net_in: InPort = c.bind(Coupling::Active, "127.0.0.1:8001".parse().unwrap());
 
    let proto_0 = Arc::new(Protocol::parse(b"").unwrap());
 
    let mut c = c.connect(None).unwrap();
 
    let mut c = c.connect(&proto_0, None).unwrap();
 
    let (mem_out, mem_in) = c.new_channel();
 
    let mut inbuf = [0u8; 64];
 
    c.new_component(&proto_0, b"sync".to_vec(), &[net_in.into(), mem_out.into()]).unwrap();
 
    c.new_component(b"sync".to_vec(), &[net_in.into(), mem_out.into()]).unwrap();
 
    let mut ops = [
 
        PortOpRs::In { msg_range: None, port: &mem_in },
 
        PortOpRs::Out { msg: b"hey", port: &net_out, optional: false },
 
        PortOpRs::Out { msg: b"hi?", port: &net_out, optional: true },
 
        PortOpRs::Out { msg: b"yo!", port: &net_out, optional: false },
 
    ];
src/runtime/experimental/vec_storage.rs
Show inline comments
 
@@ -61,20 +61,27 @@ impl Bitvec {
 
//
 
// invariant A: data elements are inititalized <=> occupied bit is set
 
// invariant B: occupied and vacant have an empty intersection
 
// invariant C: (vacant U occupied) subset of (0..data.len)
 
// invariant D: last element of data is not in VACANT state
 
// invariant E: number of allocated bits in vacant and occupied >= data.len()
 
// invariant F: vacant_bit_count == vacant.iter().count()
 
pub struct VecStorage<T> {
 
    data: Vec<MaybeUninit<T>>,
 
    occupied: Bitvec,
 
    vacant: Bitvec,
 
    occupied_bit_count: usize,
 
}
 
impl<T> Default for VecStorage<T> {
 
    fn default() -> Self {
 
        Self { data: Default::default(), vacant: Default::default(), occupied: Default::default() }
 
        Self {
 
            data: Default::default(),
 
            vacant: Default::default(),
 
            occupied: Default::default(),
 
            occupied_bit_count: 0,
 
        }
 
    }
 
}
 
impl<T: Debug> Debug for VecStorage<T> {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        enum FmtT<'a, T> {
 
            Vacant,
 
@@ -119,24 +126,28 @@ impl<T> VecStorage<T> {
 
        } else {
 
            // 2. Invariant A => reading valid ata
 
            Some(&*self.data.get_unchecked(i).as_ptr())
 
        }
 
    }
 
    //////////////
 
    pub fn len(&self) -> usize {
 
        self.occupied_bit_count
 
    }
 
    pub fn with_reserved_range(range_end: usize) -> Self {
 
        let mut data = Vec::with_capacity(range_end);
 
        unsafe {
 
            // data is uninitialized, as intended
 
            data.set_len(range_end);
 
        }
 
        let bitset_len = (range_end + (usize_bits() - 1)) / usize_bits();
 
        let chunk_iter = std::iter::repeat(0usize).take(bitset_len);
 
        Self {
 
            data,
 
            vacant: Bitvec(chunk_iter.clone().collect()),
 
            occupied: Bitvec(chunk_iter.collect()),
 
            occupied_bit_count: 0,
 
        }
 
    }
 
    pub fn clear(&mut self) {
 
        for i in 0..self.data.len() {
 
            // SAFE: bitvec bounds ensured by invariant E
 
            if unsafe { self.occupied.contains(i) } {
 
@@ -147,12 +158,13 @@ impl<T> VecStorage<T> {
 
                    drop(self.data.get_unchecked_mut(i).as_ptr().read());
 
                }
 
            }
 
        }
 
        self.vacant.0.clear();
 
        self.occupied.0.clear();
 
        self.occupied_bit_count = 0;
 
    }
 
    pub fn iter(&self) -> impl Iterator<Item = &T> {
 
        (0..self.data.len()).filter_map(move |i| unsafe { self.get_occupied_unchecked(i) })
 
    }
 
    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> {
 
        (0..self.data.len()).filter_map(move |i| unsafe {
 
@@ -210,21 +222,23 @@ impl<T> VecStorage<T> {
 
        unsafe {
 
            // 1. invariant C => write is within bounds
 
            // 2. i WAS reserved => no initialized data is being overwritten
 
            self.data.get_unchecked_mut(i).as_mut_ptr().write(t);
 
            self.occupied.insert(i);
 
        };
 
        self.occupied_bit_count += 1;
 
    }
 
    pub fn new_occupied(&mut self, t: T) -> usize {
 
        let i = self.new_reserved();
 
        unsafe {
 
            // 1. invariant C => write is within bounds
 
            // 2. i WAS reserved => no initialized data is being overwritten
 
            self.data.get_unchecked_mut(i).as_mut_ptr().write(t);
 
            self.occupied.insert(i);
 
        };
 
        self.occupied_bit_count += 1;
 
        i
 
    }
 
    pub fn vacate(&mut self, i: usize) -> Option<T> {
 
        // SAFE: bitvec bounds ensured by invariant E
 
        if i >= self.data.len() || unsafe { self.vacant.contains(i) } {
 
            // already vacant. nothing to do here
 
@@ -233,12 +247,13 @@ impl<T> VecStorage<T> {
 
        // i is certainly within bounds of self.data
 
        // SAFE: bitvec bounds ensured by invariant E
 
        let value = if unsafe { self.occupied.remove(i) } {
 
            unsafe {
 
                // 1. index is within bounds
 
                // 2. i is occupied => initialized data is being read
 
                self.occupied_bit_count -= 1;
 
                Some(self.data.get_unchecked_mut(i).as_ptr().read())
 
            }
 
        } else {
 
            // reservations have no data to drop
 
            None
 
        };
src/runtime/mod.rs
Show inline comments
 
@@ -223,12 +223,45 @@ trait Messengerlike {
 
                }
 
                Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed),
 
                Err(PollDeadlineErr::Timeout) => return Ok(None),
 
            }
 
        }
 
    }
 

	
 
    // attempt to receive a message from one of the endpoints before the deadline
 
    fn recv_until(
 
        &mut self,
 
        deadline: Option<Instant>,
 
    ) -> Result<Option<ReceivedMsg>, MessengerRecvErr> {
 
        // try get something buffered
 
        if let Some(x) = self.get_state_mut().undelayed.pop() {
 
            return Ok(Some(x));
 
        }
 

	
 
        loop {
 
            // polled_undrained may not be empty
 
            while let Some(eekey) = self.get_state_mut().polled_undrained.pop() {
 
                if let Some(msg) = self.get_endpoint_mut(eekey).recv()? {
 
                    // this endpoint MAY still have messages! check again in future
 
                    self.get_state_mut().polled_undrained.insert(eekey);
 
                    return Ok(Some(ReceivedMsg { recipient: eekey, msg }));
 
                }
 
            }
 

	
 
            let state = self.get_state_mut();
 
            match state.poll_events_until(deadline) {
 
                Ok(()) => {
 
                    for e in state.events.iter() {
 
                        state.polled_undrained.insert(Key::from_token(e.token()));
 
                    }
 
                }
 
                Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed),
 
                Err(PollDeadlineErr::Timeout) => return Ok(None),
 
            }
 
        }
 
    }
 
}
 

	
 
/////////////////////////////////
 
impl Debug for SolutionStorage {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.pad("Solutions: [")?;
 
@@ -264,13 +297,13 @@ impl<T> Default for Arena<T> {
 
        Self { storage: vec![] }
 
    }
 
}
 
impl<T> Arena<T> {
 
    pub fn alloc(&mut self, t: T) -> Key {
 
        self.storage.push(t);
 
        Key::from_raw(self.storage.len() as u64 - 1)
 
        Key::from_raw(self.storage.len() - 1)
 
    }
 
    pub fn get(&self, key: Key) -> Option<&T> {
 
        self.storage.get(key.to_raw() as usize)
 
    }
 
    pub fn get_mut(&mut self, key: Key) -> Option<&mut T> {
 
        self.storage.get_mut(key.to_raw() as usize)
 
@@ -282,13 +315,13 @@ impl<T> Arena<T> {
 
        self.keyspace().zip(self.storage.iter())
 
    }
 
    pub fn len(&self) -> usize {
 
        self.storage.len()
 
    }
 
    pub fn keyspace(&self) -> impl Iterator<Item = Key> {
 
        (0..(self.storage.len() as u64)).map(Key::from_raw)
 
        (0..self.storage.len()).map(Key::from_raw)
 
    }
 
}
 

	
 
impl ChannelIdStream {
 
    fn new(controller_id: ControllerId) -> Self {
 
        Self { controller_id, next_channel_index: 0 }
 
@@ -305,12 +338,23 @@ impl MessengerState {
 
        use PollDeadlineErr::*;
 
        self.events.clear();
 
        let poll_timeout = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?;
 
        self.poll.poll(&mut self.events, Some(poll_timeout)).map_err(|_| PollingFailed)?;
 
        Ok(())
 
    }
 
    fn poll_events_until(&mut self, deadline: Option<Instant>) -> Result<(), PollDeadlineErr> {
 
        use PollDeadlineErr::*;
 
        self.events.clear();
 
        let poll_timeout = if let Some(d) = deadline {
 
            Some(d.checked_duration_since(Instant::now()).ok_or(Timeout)?)
 
        } else {
 
            None
 
        };
 
        self.poll.poll(&mut self.events, poll_timeout).map_err(|_| PollingFailed)?;
 
        Ok(())
 
    }
 
}
 
impl From<PollDeadlineErr> for ConnectErr {
 
    fn from(e: PollDeadlineErr) -> ConnectErr {
 
        match e {
 
            PollDeadlineErr::Timeout => ConnectErr::Timeout,
 
            PollDeadlineErr::PollingFailed => ConnectErr::PollingFailed,
src/test/connector.rs
Show inline comments
 
@@ -79,13 +79,13 @@ primitive fifo_1(msg m, in i, out o) {
 
composite fifo_1_e(in i, out o) {
 
    new fifo_1(null, i, o);
 
}
 
";
 

	
 
#[test]
 
fn connects_ok() {
 
fn connector_connects_ok() {
 
    // Test if we can connect natives using the given PDL
 
    /*
 
    Alice -->silence--P|A-->silence--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
@@ -105,13 +105,13 @@ fn connects_ok() {
 
            x.connect(timeout).unwrap();
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connected_but_silent_natives() {
 
fn connector_connected_but_silent_natives() {
 
    // Test if we can connect natives and have a trivial sync round
 
    /*
 
    Alice -->silence--P|A-->silence--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
@@ -133,13 +133,13 @@ fn connected_but_silent_natives() {
 
            assert_eq!(Ok(0), x.sync(timeout));
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn self_forward_ok() {
 
fn connector_self_forward_ok() {
 
    // Test a deterministic system
 
    // where a native has no network bindings
 
    // and sends messages to itself
 
    /*
 
        /-->\
 
    Alice   forward
 
@@ -163,13 +163,13 @@ fn self_forward_ok() {
 
                assert_eq!(Ok(MSG), x.read_gotten(1));
 
            }
 
        },
 
    ]));
 
}
 
#[test]
 
fn token_spout_ok() {
 
fn connector_token_spout_ok() {
 
    // Test a deterministic system where the proto
 
    // creates token messages
 
    /*
 
    Alice<--token_spout
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
@@ -188,13 +188,13 @@ fn token_spout_ok() {
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn waiter_ok() {
 
fn connector_waiter_ok() {
 
    // Test a stateful proto that blocks port 0 for 10 rounds
 
    // and then sends a single token on the 11th
 
    /*
 
    Alice<--token_spout
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
@@ -214,13 +214,13 @@ fn waiter_ok() {
 
            assert_eq!(Ok(&[] as &[u8]), x.read_gotten(0));
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn self_forward_timeout() {
 
fn connector_self_forward_timeout() {
 
    // Test a deterministic system
 
    // where a native has no network bindings
 
    // and sends messages to itself
 
    /*
 
        /-->\
 
    Alice   forward
 
@@ -241,13 +241,13 @@ fn self_forward_timeout() {
 
            assert_eq!(Err(SyncErr::Timeout), x.sync(timeout));
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn forward_det() {
 
fn connector_forward_det() {
 
    // Test if a deterministic protocol and natives can pass one message
 
    /*
 
    Alice -->forward--P|A-->forward--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
@@ -277,13 +277,13 @@ fn forward_det() {
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn nondet_proto_det_natives() {
 
fn connector_nondet_proto_det_natives() {
 
    // Test the use of a nondeterministic protocol
 
    // where Alice decides the choice and the others conform
 
    /*
 
    Alice -->sync--A|P-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
@@ -315,13 +315,13 @@ fn nondet_proto_det_natives() {
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn putter_determines() {
 
fn connector_putter_determines() {
 
    // putter and getter
 
    /*
 
    Alice -->sync--A|P-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
@@ -355,13 +355,13 @@ fn putter_determines() {
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn getter_determines() {
 
fn connector_getter_determines() {
 
    // putter and getter
 
    /*
 
    Alice -->sync--A|P-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
@@ -396,13 +396,13 @@ fn getter_determines() {
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn alternator_2() {
 
fn connector_alternator_2() {
 
    // Test a deterministic system which
 
    // alternates sending Sender's messages to A or B
 
    /*                    /--|-->A
 
    Sender -->alternator_2
 
                          \--|-->B
 
    */
 
@@ -463,13 +463,13 @@ fn alternator_2() {
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn composite_chain_a() {
 
fn connector_composite_chain_a() {
 
    // Check if composition works. Forward messages through long chains
 
    /*
 
    Alice -->sync-->sync-->A|P-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
@@ -502,13 +502,13 @@ fn composite_chain_a() {
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn composite_chain_b() {
 
fn connector_composite_chain_b() {
 
    // Check if composition works. Forward messages through long chains
 
    /*
 
    Alice -->sync-->sync-->A|P-->sync-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
@@ -541,13 +541,13 @@ fn composite_chain_b() {
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn exchange() {
 
fn connector_exchange() {
 
    /*
 
        /-->\      /-->P|A-->\      /-->\
 
    Alice   exchange         exchange   Bob
 
        \<--/      \<--P|A<--/      \<--/
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
@@ -586,13 +586,13 @@ fn exchange() {
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn routing_filter() {
 
fn connector_routing_filter() {
 
    // Make a protocol whose behavior is a function of the contents of
 
    // a message. Here, the putter determines what is sent, and the proto
 
    // determines how it is routed
 
    /*
 
    Sender -->filter-->P|A-->sync--> Receiver
 
    */
 
@@ -647,13 +647,13 @@ fn routing_filter() {
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn fifo_1_e() {
 
fn connector_fifo_1_e() {
 
    /*
 
        /-->\
 
    Alice   fifo_1
 
        \<--/
 
    */
 
    let timeout = Duration::from_millis(1_500);
0 comments (0 inline, 0 general)