Changeset - 71bf91201a18
[Not reviewed]
0 1 0
Christopher Esterhuyse - 5 years ago 2020-02-21 17:24:08
christopher.esterhuyse@gmail.com
laying out communications with bitmatrix
1 file changed with 8 insertions and 2 deletions:
0 comments (0 inline, 0 general)
src/runtime/experimental/api.rs
Show inline comments
 
@@ -406,389 +406,395 @@ impl Connecting {
 
        // done!
 
        Ok(Connected {
 
            components: Default::default(),
 
            controller_id,
 
            channel_index_stream,
 
            endpoint_exts,
 
            native_ports,
 
            family,
 
            ephemeral: Default::default(),
 
        })
 
    }
 
    /////////
 
    pub fn connect_using_id(
 
        &mut self,
 
        controller_id: ControllerId,
 
        timeout: Option<Duration>,
 
    ) -> Result<Connected, ConnectErr> {
 
        // 1. try and create a connection from these bindings with self immutable.
 
        let connected = self.new_connected(controller_id, timeout)?;
 
        // 2. success! drain self and return
 
        self.bindings.clear();
 
        Ok(connected)
 
    }
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<Connected, ConnectErr> {
 
        self.connect_using_id(Self::random_controller_id(), timeout)
 
    }
 
}
 

	
 
#[derive(Debug)]
 
pub struct Connected {
 
    native_ports: HashSet<Port>,
 
    controller_id: ControllerId,
 
    channel_index_stream: ChannelIndexStream,
 
    endpoint_exts: VecStorage<EndpointExt>,
 
    components: VecStorage<Component>,
 
    family: Family,
 
    ephemeral: Ephemeral,
 
}
 
#[derive(Debug, Default)]
 
struct Ephemeral {
 
    // invariant: between rounds this is cleared
 
    machines: Vec<Machine>,
 
    bit_matrix: BitMatrix,
 
    assignment_to_bit_property: HashMap<(ChannelId, bool), usize>,
 
    usize_buf: Vec<usize>,
 
}
 
impl Ephemeral {
 
    fn clear(&mut self) {
 
        self.bit_matrix = Default::default();
 
        self.usize_buf.clear();
 
        self.machines.clear();
 
        self.assignment_to_bit_property.clear();
 
    }
 
}
 
#[derive(Debug)]
 
struct Machine {
 
    component_index: usize,
 
    state: ProtocolS,
 
}
 
struct MonoCtx<'a> {
 
    another_pass: &'a mut bool,
 
}
 
impl MonoContext for MonoCtx<'_> {
 
    type D = ProtocolD;
 
    type S = ProtocolS;
 

	
 
    fn new_component(&mut self, moved_keys: HashSet<Key>, init_state: Self::S) {
 
        todo!()
 
    }
 
    fn new_channel(&mut self) -> [Key; 2] {
 
        todo!()
 
    }
 
    fn new_random(&mut self) -> u64 {
 
        todo!()
 
    }
 
}
 
impl Connected {
 
    pub fn new_component(
 
        &mut self,
 
        protocol: &Arc<ProtocolD>,
 
        identifier: &Arc<[u8]>,
 
        moved_port_list: &[Port],
 
    ) -> Result<(), MainComponentErr> {
 
        //////////////////////////////////////////
 
        // 1. try and create a new component (without mutating self)
 
        use MainComponentErr::*;
 
        let moved_port_set = {
 
            let mut set: HashSet<Port> = Default::default();
 
            for &port in moved_port_list.iter() {
 
                if !self.native_ports.contains(&port) {
 
                    return Err(CannotMovePort(port));
 
                }
 
                if !set.insert(port) {
 
                    return Err(DuplicateMovedPort(port));
 
                }
 
            }
 
            set
 
        };
 
        // moved_port_set is disjoint to native_ports
 
        let expected_polarities = protocol.component_polarities(identifier)?;
 
        if moved_port_list.len() != expected_polarities.len() {
 
            return Err(WrongNumberOfParamaters { expected: expected_polarities.len() });
 
        }
 
        // correct polarity list
 
        for (param_index, (&port, &expected_polarity)) in
 
            moved_port_list.iter().zip(expected_polarities.iter()).enumerate()
 
        {
 
            let polarity =
 
                self.endpoint_exts.get_occupied(port.0).ok_or(UnknownPort(port))?.info.polarity;
 
            if polarity != expected_polarity {
 
                return Err(WrongPortPolarity { param_index, port });
 
            }
 
        }
 
        let state = Some(protocol.new_main_component(identifier, &moved_port_list));
 
        let component = Component {
 
            port_set: moved_port_set,
 
            protocol: protocol.clone(),
 
            identifier: identifier.clone(),
 
            state,
 
        };
 
        //////////////////////////////
 
        // success! mutate self and return Ok
 
        self.native_ports.retain(|e| !component.port_set.contains(e));
 
        self.components.new_occupied(component);
 
        Ok(())
 
    }
 
    pub fn new_channel(&mut self) -> (OutPort, InPort) {
 
        assert!(self.endpoint_exts.len() <= std::u32::MAX as usize - 2);
 
        let channel_id = ChannelId {
 
            controller_id: self.controller_id,
 
            channel_index: self.channel_index_stream.next(),
 
        };
 
        let [e0, e1] = Endpoint::new_memory_pair();
 
        let kp = self.endpoint_exts.new_occupied(EndpointExt {
 
            info: EndpointInfo { channel_id, polarity: Putter },
 
            endpoint: e0,
 
        });
 
        let kg = self.endpoint_exts.new_occupied(EndpointExt {
 
            info: EndpointInfo { channel_id, polarity: Getter },
 
            endpoint: e1,
 
        });
 
        (OutPort(Port(kp)), InPort(Port(kg)))
 
    }
 
    pub fn sync_set(&mut self, _inbuf: &mut [u8], _ops: &mut [PortOpRs]) -> Result<(), ()> {
 
        // For every component, take its state and make a singleton machine
 
        for (component_index, component) in self.components.iter_mut().enumerate() {
 
            let machine = Machine { component_index, state: component.state.take().unwrap() };
 
            self.ephemeral.machines.push(machine);
 
        }
 
        // Grow property matrix. has |machines| entities and {to_run => 0, to_remove => 1} properties
 
        const PROP_TO_RUN: usize = 0;
 
        const PROP_TO_REMOVE: usize = 1;
 
        self.ephemeral
 
            .bit_matrix
 
            .grow_to(Pair { property: 2, entity: self.ephemeral.machines.len() as u32 });
 
        // Set to_run property for all existing machines
 
        self.ephemeral.bit_matrix.batch_mut(move |p| p[PROP_TO_RUN] = TRUE_CHUNK);
 

	
 
        /////////////
 
        // perform mono runs, adding and removing TO_RUN property bits
 
        let mut usize_buf = vec![];
 
        let mut another_pass = true;
 
        while another_pass {
 
            another_pass = false;
 
            let machine_index_iter = self
 
                .ephemeral
 
                .bit_matrix
 
                .iter_entities_where(&mut usize_buf, move |p| p[PROP_TO_RUN]);
 
            for machine_index in machine_index_iter {
 
                let machine = &mut self.ephemeral.machines[machine_index as usize];
 
                let component = self.components.get_occupied(machine.component_index).unwrap();
 
                let mut ctx = MonoCtx { another_pass: &mut another_pass };
 
                match machine.state.pre_sync_run(&mut ctx, &component.protocol) {
 
                    MonoBlocker::Inconsistent => todo!(),
 
                    MonoBlocker::ComponentExit => self
 
                        .ephemeral
 
                        .bit_matrix
 
                        .set(Pair { entity: machine_index, property: PROP_TO_REMOVE as u32 }),
 
                    MonoBlocker::SyncBlockStart => self
 
                        .ephemeral
 
                        .bit_matrix
 
                        .unset(Pair { entity: machine_index, property: PROP_TO_RUN as u32 }),
 
                }
 
            }
 
        }
 
        // no machines have property TO_RUN
 

	
 
        // from back to front, swap_remove all machines with PROP_TO_REMOVE
 
        let machine_index_iter = self
 
            .ephemeral
 
            .bit_matrix
 
            .iter_entities_where_rev(&mut usize_buf, move |p| p[PROP_TO_REMOVE]);
 
        self.ephemeral.bit_matrix = Default::default(); // clear matrix
 
        for machine_index in machine_index_iter {
 
            self.ephemeral.machines.swap_remove(machine_index as usize);
 
            let machine = self.ephemeral.machines.swap_remove(machine_index as usize);
 
            drop(machine);
 
        }
 

	
 
        // from now on, the number
 
        let matrix_bounds = Pair { entity: self.ephemeral.machines.len() as u32 * 2, property: 8 };
 
        self.ephemeral.bit_matrix = BitMatrix::new(matrix_bounds); // clear propertties
 

	
 
        // !!! TODO poly run until solution is found
 

	
 
        // logically destructure self so we can read and write to different fields interleaved...
 
        let solution_assignments: Vec<(ChannelId, bool)> = vec![];
 
        let Self {
 
            components,
 
            ephemeral: Ephemeral { bit_matrix, assignment_to_bit_property, usize_buf, machines },
 
            ..
 
        } = self;
 

	
 
        // !!!!!!! TODO MORE HERE
 

	
 
        let machine_index_iter = bit_matrix.iter_entities_where(usize_buf, move |p| {
 
            solution_assignments.iter().fold(TRUE_CHUNK, |chunk, assignment| {
 
                let &bit_property = assignment_to_bit_property.get(assignment).unwrap();
 
                chunk & p[bit_property]
 
            })
 
        });
 
        for machine_index in machine_index_iter {
 
            let machine = &machines[machine_index as usize];
 
            let component = &mut components.get_occupied_mut(machine.component_index).unwrap();
 
            component.state = Some(machine.state.clone());
 
            println!("visiting machine at index {:?}", machine_index);
 
        }
 
        self.ephemeral.clear();
 
        println!("B {:#?}", self);
 
        Ok(())
 
    }
 
    pub fn sync_subsets(
 
        &mut self,
 
        _inbuf: &mut [u8],
 
        _ops: &mut [PortOpRs],
 
        bit_subsets: &[&[usize]],
 
    ) -> Result<usize, ()> {
 
        for (batch_index, bit_subset) in bit_subsets.iter().enumerate() {
 
            println!("batch_index {:?}", batch_index);
 
            let chunk_iter = bit_subset.iter().copied();
 
            for index in BitChunkIter::new(chunk_iter) {
 
                println!("  index {:?}", index);
 
            }
 
        }
 
        Ok(0)
 
    }
 
}
 

	
 
macro_rules! bitslice {
 
    ($( $num:expr  ),*) => {{
 
        &[0 $( | (1usize << $num)  )*]
 
    }};
 
}
 

	
 
#[test]
 
fn api_new_test() {
 
    let mut c = Connecting::default();
 
    let net_out: OutPort = c.bind(Coupling::Active, "127.0.0.1:8000".parse().unwrap());
 
    let net_in: InPort = c.bind(Coupling::Active, "127.0.0.1:8001".parse().unwrap());
 
    let proto_0 = Arc::new(ProtocolD::parse(b"").unwrap());
 
    let mut c = c.connect(None).unwrap();
 
    let (mem_out, mem_in) = c.new_channel();
 
    let mut inbuf = [0u8; 64];
 
    let identifier: Arc<[u8]> = b"sync".to_vec().into();
 
    c.new_component(&proto_0, &identifier, &[net_in.into(), mem_out.into()]).unwrap();
 
    let mut ops = [
 
        PortOpRs::In { msg_range: None, port: &mem_in },
 
        PortOpRs::Out { msg: b"hey", port: &net_out, optional: false },
 
        PortOpRs::Out { msg: b"hi?", port: &net_out, optional: true },
 
        PortOpRs::Out { msg: b"yo!", port: &net_out, optional: false },
 
    ];
 
    c.sync_set(&mut inbuf, &mut ops).unwrap();
 
    c.sync_subsets(&mut inbuf, &mut ops, &[bitslice! {0,1,2}]).unwrap();
 
}
 

	
 
#[repr(C)]
 
pub struct PortOp {
 
    msgptr: *mut u8, // read if OUT, field written if IN, will point into buf
 
    msglen: usize,   // read if OUT, written if IN, won't exceed buf
 
    port: Port,
 
    optional: bool, // no meaning if
 
}
 

	
 
pub enum PortOpRs<'a> {
 
    In { msg_range: Option<Range<usize>>, port: &'a InPort },
 
    Out { msg: &'a [u8], port: &'a OutPort, optional: bool },
 
}
 

	
 
unsafe fn c_sync_set(
 
    connected: &mut Connected,
 
    inbuflen: usize,
 
    inbufptr: *mut u8,
 
    opslen: usize,
 
    opsptr: *mut PortOp,
 
) -> i32 {
 
    let buf = as_mut_slice(inbuflen, inbufptr);
 
    let ops = as_mut_slice(opslen, opsptr);
 
    let (subset_index, wrote) = sync_inner(connected, buf);
 
    assert_eq!(0, subset_index);
 
    for op in ops {
 
        if let Some(range) = wrote.get(&op.port) {
 
            op.msgptr = inbufptr.add(range.start);
 
            op.msglen = range.end - range.start;
 
        }
 
    }
 
    0
 
}
 

	
 
unsafe fn c_sync_subset(
 
    connected: &mut Connected,
 
    inbuflen: usize,
 
    inbufptr: *mut u8,
 
    opslen: usize,
 
    opsptr: *mut PortOp,
 
    subsetslen: usize,
 
    subsetsptr: *const *const usize,
 
) -> i32 {
 
    let buf: &mut [u8] = as_mut_slice(inbuflen, inbufptr);
 
    let ops: &mut [PortOp] = as_mut_slice(opslen, opsptr);
 
    let subsets: &[*const usize] = as_const_slice(subsetslen, subsetsptr);
 
    let subsetlen = usizes_for_bits(opslen);
 
    // don't yet know subsetptr; which subset fires unknown!
 

	
 
    let (subset_index, wrote) = sync_inner(connected, buf);
 
    let subsetptr: *const usize = subsets[subset_index];
 
    let subset: &[usize] = as_const_slice(subsetlen, subsetptr);
 

	
 
    for index in BitChunkIter::new(subset.iter().copied()) {
 
        let op = &mut ops[index as usize];
 
        if let Some(range) = wrote.get(&op.port) {
 
            op.msgptr = inbufptr.add(range.start);
 
            op.msglen = range.end - range.start;
 
        }
 
    }
 
    subset_index as i32
 
}
 

	
 
// dummy fn for the actual synchronous round
 
fn sync_inner<'c, 'b>(
 
    _connected: &'c mut Connected,
 
    _buf: &'b mut [u8],
 
) -> (usize, &'b HashMap<Port, Range<usize>>) {
 
    todo!()
 
}
 

	
 
unsafe fn as_mut_slice<'a, T>(len: usize, ptr: *mut T) -> &'a mut [T] {
 
    std::slice::from_raw_parts_mut(ptr, len)
 
}
 
unsafe fn as_const_slice<'a, T>(len: usize, ptr: *const T) -> &'a [T] {
 
    std::slice::from_raw_parts(ptr, len)
 
}
 

	
 
#[test]
 
fn api_connecting() {
 
    let addrs: [SocketAddr; 3] = [
 
        "127.0.0.1:8888".parse().unwrap(),
 
        "127.0.0.1:8889".parse().unwrap(),
 
        "127.0.0.1:8890".parse().unwrap(),
 
    ];
 

	
 
    lazy_static::lazy_static! {
 
        static ref PROTOCOL: Arc<ProtocolD> = {
 
            static PDL: &[u8] = b"
 
            primitive sync(in i, out o) {
 
                while(true) synchronous {
 
                    put(o, get(i));
 
                }
 
            }
 
            ";
 
            Arc::new(ProtocolD::parse(PDL).unwrap())
 
        };
 
    }
 

	
 
    const TIMEOUT: Option<Duration> = Some(Duration::from_secs(1));
 
    let handles = vec![
 
        std::thread::spawn(move || {
 
            let mut c = Connecting::default();
 
            let p_in: InPort = c.bind(Coupling::Passive, addrs[0]);
 
            let p_out: OutPort = c.bind(Coupling::Active, addrs[1]);
 
            let mut c = c.connect(TIMEOUT).unwrap();
 
            println!("c {:#?}", &c);
 

	
 
            let identifier = b"sync".to_vec().into();
 
            c.new_component(&PROTOCOL, &identifier, &[p_in.into(), p_out.into()]).unwrap();
 
            println!("c {:#?}", &c);
 

	
 
            let mut inbuf = vec![];
 
            let mut port_ops = [];
 
            c.sync_set(&mut inbuf, &mut port_ops).unwrap();
 
        }),
 
        std::thread::spawn(move || {
 
            let mut connecting = Connecting::default();
 
            let _a: OutPort = connecting.bind(Coupling::Active, addrs[0]);
 
            let _b: InPort = connecting.bind(Coupling::Passive, addrs[1]);
 
            let _c: InPort = connecting.bind(Coupling::Active, addrs[2]);
 
            let _connected = connecting.connect(TIMEOUT).unwrap();
 
        }),
0 comments (0 inline, 0 general)