Changeset - adb23c484182
[Not reviewed]
0 3 0
Christopher Esterhuyse - 5 years ago 2020-02-21 13:00:16
christopher.esterhuyse@gmail.com
component and port storages on 2 axes
3 files changed with 80 insertions and 35 deletions:
0 comments (0 inline, 0 general)
src/common.rs
Show inline comments
 
@@ -41,24 +41,29 @@ pub enum Polarity {
 
    Getter, // input port (from the perspective of the component)
 
}
 

	
 
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Copy, Clone, Debug)]
 
#[repr(C)]
 
pub struct Port(pub usize); // ports are COPY
 
pub type Key = Port;
 

	
 
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
 
pub enum MainComponentErr {
 
    NoSuchComponent,
 
    NonPortTypeParameters,
 
    CannotMovePort(Port),
 
    WrongNumberOfParamaters { expected: usize },
 
    UnknownPort(Port),
 
    WrongPortPolarity { param_index: usize, port: Port },
 
    DuplicateMovedPort(Port),
 
}
 
pub trait ProtocolDescription: Sized {
 
    type S: ComponentState<D = Self>;
 

	
 
    fn parse(pdl: &[u8]) -> Result<Self, String>;
 
    fn component_polarities(&self, identifier: &[u8]) -> Result<Vec<Polarity>, MainComponentErr>;
 
    fn new_main_component(&self, identifier: &[u8], ports: &[Key]) -> Self::S;
 
}
 

	
 
pub trait ComponentState: Sized + Clone {
 
    type D: ProtocolDescription;
 
    fn pre_sync_run<C: MonoContext<D = Self::D, S = Self>>(
src/runtime/errors.rs
Show inline comments
 
@@ -79,15 +79,16 @@ pub enum EvalErr {
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum MessengerRecvErr {
 
    PollingFailed,
 
    EndpointErr(Port, EndpointErr),
 
}
 
impl From<MainComponentErr> for ConfigErr {
 
    fn from(e: MainComponentErr) -> Self {
 
        use ConfigErr as C;
 
        use MainComponentErr as M;
 
        match e {
 
            M::NoSuchComponent => C::NoSuchComponent,
 
            M::NonPortTypeParameters => C::NonPortTypeParameters,
 
            _ => todo!(),
 
        }
 
    }
 
}
src/runtime/experimental/api.rs
Show inline comments
 
use super::vec_storage::VecStorage;
 
use crate::common::*;
 
use crate::runtime::endpoint::EndpointExt;
 
use crate::runtime::endpoint::EndpointInfo;
 
use crate::runtime::endpoint::{Endpoint, Msg, SetupMsg};
 
use crate::runtime::errors::EndpointErr;
 
use crate::runtime::errors::MessengerRecvErr;
 
use crate::runtime::errors::PollDeadlineErr;
 
use crate::runtime::MessengerState;
 
use crate::runtime::Messengerlike;
 
use crate::runtime::ReceivedMsg;
 
use crate::runtime::{ProtocolD, ProtocolS};
 

	
 
use std::net::SocketAddr;
 
use std::sync::Arc;
 

	
 
pub enum Coupling {
 
    Active,
 
    Passive,
 
}
 

	
 
#[derive(Debug)]
 
struct Family {
 
    parent: Option<Port>,
 
@@ -82,24 +83,33 @@ impl Binds<OutPort> for Connecting {
 
#[derive(Debug, Clone)]
 
pub enum ConnectErr {
 
    BindErr(SocketAddr),
 
    NewSocketErr(SocketAddr),
 
    AcceptErr(SocketAddr),
 
    ConnectionShutdown(SocketAddr),
 
    PortKindMismatch(SocketAddr),
 
    EndpointErr(Port, EndpointErr),
 
    PollInitFailed,
 
    PollingFailed,
 
    Timeout,
 
}
 

	
 
#[derive(Debug)]
 
struct Component {
 
    protocol: Arc<ProtocolD>,
 
    port_set: HashSet<Port>,
 
    identifier: Arc<[u8]>,
 
    state: ProtocolS,
 
}
 

	
 
impl From<PollDeadlineErr> for ConnectErr {
 
    fn from(e: PollDeadlineErr) -> Self {
 
        use PollDeadlineErr as P;
 
        match e {
 
            P::PollingFailed => Self::PollingFailed,
 
            P::Timeout => Self::Timeout,
 
        }
 
    }
 
}
 
impl From<MessengerRecvErr> for ConnectErr {
 
    fn from(e: MessengerRecvErr) -> Self {
 
        use MessengerRecvErr as M;
 
@@ -119,29 +129,29 @@ impl Connecting {
 
            // 1. All random bytes give valid Bytes8
 
            // 2. Bytes8 and ControllerId have same valid representations
 
            std::mem::transmute::<Bytes8, ControllerId>(bytes)
 
        }
 
    }
 
    fn test_stream_connectivity(stream: &mut TcpStream) -> bool {
 
        use std::io::Write;
 
        stream.write(&[]).is_ok()
 
    }
 
    fn new_connected(
 
        &self,
 
        controller_id: ControllerId,
 
        protocol: &Arc<Protocol>,
 
        timeout: Option<Duration>,
 
    ) -> Result<Connected, ConnectErr> {
 
        use ConnectErr::*;
 

	
 
        ///////////////////////////////////////////////////////
 
        // 1. bindings correspond with ports 0..bindings.len(). For each:
 
        //    - reserve a slot in endpoint_exts.
 
        //    - store the port in `native_ports' set.
 
        let mut endpoint_exts = VecStorage::<EndpointExt>::with_reserved_range(self.bindings.len());
 
        let native_ports = (0..self.bindings.len()).map(Port).collect();
 

	
 
        // 2. create MessengerState structure for polling channels
 
        let edge = PollOpt::edge();
 
        let [ready_r, ready_w] = [Ready::readable(), Ready::writable()];
 
        let mut ms =
 
            MessengerState::with_event_capacity(self.bindings.len()).map_err(|_| PollInitFailed)?;
 

	
 
@@ -247,24 +257,25 @@ impl Connecting {
 
                                break 'recv_loop;
 
                            } else {
 
                                ms.delayed.push(ReceivedMsg { recipient: ekey, msg });
 
                            }
 
                        }
 
                    }
 
                }
 
            }
 
        }
 
        assert_eq!(None, endpoint_exts.iter_reserved().next());
 
        drop(todos);
 

	
 
        ///////////////////////////////////////////////////////
 
        // 1. construct `family', i.e. perform the sink tree setup procedure
 
        use {Msg::SetupMsg as S, SetupMsg::*};
 
        let mut messenger = (&mut ms, &mut endpoint_exts);
 
        impl Messengerlike for (&mut MessengerState, &mut VecStorage<EndpointExt>) {
 
            fn get_state_mut(&mut self) -> &mut MessengerState {
 
                self.0
 
            }
 
            fn get_endpoint_mut(&mut self, ekey: Key) -> &mut Endpoint {
 
                &mut self
 
                    .1
 
                    .get_occupied_mut(ekey.to_raw() as usize)
 
                    .expect("OUT OF BOUNDS")
 
@@ -382,98 +393,127 @@ impl Connecting {
 
                }
 
                S(SetupMsg::LeaderAnnounce { leader }) => {
 
                    assert!(awaiting.remove(&recipient));
 
                    assert!(leader == my_leader);
 
                    assert!(Some(recipient) != parent);
 
                    // they wouldn't send me this if they considered me their parent
 
                }
 
                _ => messenger.delay(ReceivedMsg { recipient, msg }),
 
            }
 
        }
 
        let family = Family { parent, children };
 

	
 
        // 1. done! return
 
        // done!
 
        Ok(Connected {
 
            components: Default::default(),
 
            controller_id,
 
            channel_index_stream,
 
            protocol: protocol.clone(),
 
            endpoint_exts,
 
            native_ports,
 
            family,
 
        })
 
    }
 
    /////////
 
    pub fn connect_using_id(
 
        &mut self,
 
        controller_id: ControllerId,
 
        protocol: &Arc<Protocol>,
 
        timeout: Option<Duration>,
 
    ) -> Result<Connected, ConnectErr> {
 
        // 1. try and create a connection from these bindings with self immutable.
 
        let connected = self.new_connected(controller_id, protocol, timeout)?;
 
        let connected = self.new_connected(controller_id, timeout)?;
 
        // 2. success! drain self and return
 
        self.bindings.clear();
 
        Ok(connected)
 
    }
 
    pub fn connect(
 
        &mut self,
 
        protocol: &Arc<Protocol>,
 
        timeout: Option<Duration>,
 
    ) -> Result<Connected, ConnectErr> {
 
        self.connect_using_id(Self::random_controller_id(), protocol, timeout)
 
    }
 
}
 
#[derive(Debug)]
 
pub struct Protocol;
 
impl Protocol {
 
    pub fn parse(_pdl_text: &[u8]) -> Result<Self, ()> {
 
        Ok(Protocol)
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<Connected, ConnectErr> {
 
        self.connect_using_id(Self::random_controller_id(), timeout)
 
    }
 
}
 

	
 
#[derive(Debug)]
 
pub struct Connected {
 
    native_ports: HashSet<Port>,
 
    controller_id: ControllerId,
 
    channel_index_stream: ChannelIndexStream,
 
    endpoint_exts: VecStorage<EndpointExt>,
 
    protocol: Arc<Protocol>,
 
    components: VecStorage<Component>,
 
    family: Family,
 
}
 
impl Connected {
 
    pub fn new_component(
 
        &mut self,
 
        protocol: &Arc<ProtocolD>,
 
        identifier: &Arc<[u8]>,
 
        moved_port_list: &[Port],
 
    ) -> Result<(), MainComponentErr> {
 
        //////////////////////////////////////////
 
        // 1. try and create a new component (without mutating self)
 
        use MainComponentErr::*;
 
        let moved_port_set = {
 
            let mut set: HashSet<Port> = Default::default();
 
            for &port in moved_port_list.iter() {
 
                if !self.native_ports.contains(&port) {
 
                    return Err(CannotMovePort(port));
 
                }
 
                if !set.insert(port) {
 
                    return Err(DuplicateMovedPort(port));
 
                }
 
            }
 
            set
 
        };
 
        // moved_port_set is disjoint to native_ports
 
        let expected_polarities = protocol.component_polarities(identifier)?;
 
        if moved_port_list.len() != expected_polarities.len() {
 
            return Err(WrongNumberOfParamaters { expected: expected_polarities.len() });
 
        }
 
        // correct polarity list
 
        for (param_index, (&port, &expected_polarity)) in
 
            moved_port_list.iter().zip(expected_polarities.iter()).enumerate()
 
        {
 
            let polarity =
 
                self.endpoint_exts.get_occupied(port.0).ok_or(UnknownPort(port))?.info.polarity;
 
            if polarity != expected_polarity {
 
                return Err(WrongPortPolarity { param_index, port });
 
            }
 
        }
 
        let state = protocol.new_main_component(identifier, &moved_port_list);
 
        let component = Component {
 
            port_set: moved_port_set,
 
            protocol: protocol.clone(),
 
            identifier: identifier.clone(),
 
            state,
 
        };
 
        //////////////////////////////
 
        // success! mutate self and return Ok
 
        self.native_ports.retain(|e| !component.port_set.contains(e));
 
        self.components.new_occupied(component);
 
        Ok(())
 
    }
 
    pub fn new_channel(&mut self) -> (OutPort, InPort) {
 
        assert!(self.endpoint_exts.len() <= std::u32::MAX as usize - 2);
 
        let channel_id = ChannelId {
 
            controller_id: self.controller_id,
 
            channel_index: self.channel_index_stream.next(),
 
        };
 
        let [e0, e1] = Endpoint::new_memory_pair();
 
        let kp = self.endpoint_exts.new_occupied(EndpointExt {
 
            info: EndpointInfo { channel_id, polarity: Putter },
 
            endpoint: e0,
 
        });
 
        let kg = self.endpoint_exts.new_occupied(EndpointExt {
 
            info: EndpointInfo { channel_id, polarity: Getter },
 
            endpoint: e1,
 
        });
 
        (OutPort(Port(kp)), InPort(Port(kg)))
 
    }
 
    pub fn new_component(&mut self, _name: Vec<u8>, moved_ports: &[Port]) -> Result<(), ()> {
 
        let moved_ports = moved_ports.iter().copied().collect();
 
        if !self.native_ports.is_superset(&moved_ports) {
 
            return Err(());
 
        }
 
        self.native_ports.retain(|e| !moved_ports.contains(e));
 
        // self.components.push(ComponentExt { ports: moved_ports, protocol: protocol.clone(), name });
 
        todo!()
 
    }
 
    pub fn sync_set(&mut self, _inbuf: &mut [u8], _ops: &mut [PortOpRs]) -> Result<(), ()> {
 
        Ok(())
 
    }
 
    pub fn sync_subsets(
 
        &mut self,
 
        _inbuf: &mut [u8],
 
        _ops: &mut [PortOpRs],
 
        bit_subsets: &[&[usize]],
 
    ) -> Result<usize, ()> {
 
        for (batch_index, bit_subset) in bit_subsets.iter().enumerate() {
 
            println!("batch_index {:?}", batch_index);
 
            let chunk_iter = bit_subset.iter().copied();
 
@@ -487,29 +527,30 @@ impl Connected {
 

	
 
macro_rules! bitslice {
 
    ($( $num:expr  ),*) => {{
 
        &[0 $( | (1usize << $num)  )*]
 
    }};
 
}
 

	
 
#[test]
 
fn api_new_test() {
 
    let mut c = Connecting::default();
 
    let net_out: OutPort = c.bind(Coupling::Active, "127.0.0.1:8000".parse().unwrap());
 
    let net_in: InPort = c.bind(Coupling::Active, "127.0.0.1:8001".parse().unwrap());
 
    let proto_0 = Arc::new(Protocol::parse(b"").unwrap());
 
    let mut c = c.connect(&proto_0, None).unwrap();
 
    let proto_0 = Arc::new(ProtocolD::parse(b"").unwrap());
 
    let mut c = c.connect(None).unwrap();
 
    let (mem_out, mem_in) = c.new_channel();
 
    let mut inbuf = [0u8; 64];
 
    c.new_component(b"sync".to_vec(), &[net_in.into(), mem_out.into()]).unwrap();
 
    let identifier: Arc<[u8]> = b"sync".to_vec().into();
 
    c.new_component(&proto_0, &identifier, &[net_in.into(), mem_out.into()]).unwrap();
 
    let mut ops = [
 
        PortOpRs::In { msg_range: None, port: &mem_in },
 
        PortOpRs::Out { msg: b"hey", port: &net_out, optional: false },
 
        PortOpRs::Out { msg: b"hi?", port: &net_out, optional: true },
 
        PortOpRs::Out { msg: b"yo!", port: &net_out, optional: false },
 
    ];
 
    c.sync_set(&mut inbuf, &mut ops).unwrap();
 
    c.sync_subsets(&mut inbuf, &mut ops, &[bitslice! {0,1,2}]).unwrap();
 
}
 

	
 
#[repr(C)]
 
pub struct PortOp {
 
@@ -587,32 +628,30 @@ unsafe fn as_mut_slice<'a, T>(len: usize, ptr: *mut T) -> &'a mut [T] {
 
}
 
unsafe fn as_const_slice<'a, T>(len: usize, ptr: *const T) -> &'a [T] {
 
    std::slice::from_raw_parts(ptr, len)
 
}
 

	
 
#[test]
 
fn api_connecting() {
 
    let addrs: [SocketAddr; 3] = [
 
        "127.0.0.1:8888".parse().unwrap(),
 
        "127.0.0.1:8889".parse().unwrap(),
 
        "127.0.0.1:8890".parse().unwrap(),
 
    ];
 
    let protocol1 = Arc::new(Protocol::parse(b"").unwrap());
 
    let protocol2 = protocol1.clone();
 
    let handles = vec![
 
        std::thread::spawn(move || {
 
            let mut connecting = Connecting::default();
 
            let _a: OutPort = connecting.bind(Coupling::Active, addrs[0]);
 
            let connected = connecting.connect(&protocol1, None);
 
            let connected = connecting.connect(None);
 
            println!("A: {:#?}", connected);
 
        }),
 
        std::thread::spawn(move || {
 
            let mut connecting = Connecting::default();
 
            let _a: OutPort = connecting.bind(Coupling::Passive, addrs[0]);
 
            let connected = connecting.connect(&protocol2, Some(Duration::from_secs(2)));
 
            let connected = connecting.connect(Some(Duration::from_secs(2)));
 
            println!("B: {:#?}", connected);
 
        }),
 
    ];
 
    for h in handles {
 
        h.join().unwrap();
 
    }
 
}
0 comments (0 inline, 0 general)