Changeset - 02eb59c6fd66
[Not reviewed]
0 7 0
Christopher Esterhuyse - 5 years ago 2020-05-18 08:59:28
christopher.esterhuyse@gmail.com
introduced new Payload type
7 files changed with 77 insertions and 50 deletions:
0 comments (0 inline, 0 general)
src/common.rs
Show inline comments
 
@@ -24,10 +24,13 @@ pub use Polarity::*;
 

	
 
///////////////////// DEFS /////////////////////
 

	
 
pub type Payload = Vec<u8>;
 
// pub type Payload = Vec<u8>;
 
pub type ControllerId = u32;
 
pub type ChannelIndex = u32;
 

	
 
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
 
pub struct Payload(Vec<u8>);
 

	
 
/// This is a unique identifier for a channel (i.e., port).
 
#[derive(Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd)]
 
pub struct ChannelId {
 
@@ -111,6 +114,34 @@ pub trait PolyContext {
 
}
 

	
 
///////////////////// IMPL /////////////////////
 
impl Payload {
 
    pub fn new(len: usize) -> Payload {
 
        let mut v = Vec::with_capacity(len);
 
        unsafe {
 
            v.set_len(len);
 
        }
 
        Payload(v)
 
    }
 
    pub fn len(&self) -> usize {
 
        self.0.len()
 
    }
 
    pub fn as_slice(&self) -> &[u8] {
 
        &self.0
 
    }
 
    pub fn as_mut_slice(&mut self) -> &mut [u8] {
 
        &mut self.0
 
    }
 
}
 
impl std::iter::FromIterator<u8> for Payload {
 
    fn from_iter<I: IntoIterator<Item = u8>>(it: I) -> Self {
 
        Self(it.into_iter().collect())
 
    }
 
}
 
impl From<Vec<u8>> for Payload {
 
    fn from(s: Vec<u8>) -> Self {
 
        Self(s.into())
 
    }
 
}
 
impl Debug for Port {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "Port({})", self.0)
src/protocol/eval.rs
Show inline comments
 
@@ -48,7 +48,7 @@ pub enum Value {
 
    LongArray(LongArrayValue),
 
}
 
impl Value {
 
    pub fn receive_message(buffer: &Vec<u8>) -> Value {
 
    pub fn receive_message(buffer: &Payload) -> Value {
 
        Value::Message(MessageValue(Some(buffer.clone())))
 
    }
 
    fn create_message(length: Value) -> Value {
 
@@ -59,7 +59,7 @@ impl Value {
 
                    // Only messages within the expected length are allowed
 
                    Value::Message(MessageValue(None))
 
                } else {
 
                    Value::Message(MessageValue(Some(vec![0; length.try_into().unwrap()])))
 
                    Value::Message(MessageValue(Some(Payload::new(0))))
 
                }
 
            }
 
            _ => unimplemented!(),
 
@@ -108,12 +108,12 @@ impl Value {
 
                // It is inconsistent to update the null message
 
                None
 
            }
 
            (Value::Message(MessageValue(Some(buffer))), Value::Byte(ByteValue(b))) => {
 
            (Value::Message(MessageValue(Some(payload))), Value::Byte(ByteValue(b))) => {
 
                if *b < 0 {
 
                    // It is inconsistent to update with a negative value
 
                    return None;
 
                }
 
                if let Some(slot) = buffer.get_mut(the_index) {
 
                if let Some(slot) = payload.as_mut_slice().get_mut(the_index) {
 
                    *slot = (*b).try_into().unwrap();
 
                    Some(value.clone())
 
                } else {
 
@@ -121,12 +121,12 @@ impl Value {
 
                    None
 
                }
 
            }
 
            (Value::Message(MessageValue(Some(buffer))), Value::Short(ShortValue(b))) => {
 
            (Value::Message(MessageValue(Some(payload))), Value::Short(ShortValue(b))) => {
 
                if *b < 0 || *b > BYTE_MAX as i16 {
 
                    // It is inconsistent to update with a negative value or a too large value
 
                    return None;
 
                }
 
                if let Some(slot) = buffer.get_mut(the_index) {
 
                if let Some(slot) = payload.as_mut_slice().get_mut(the_index) {
 
                    *slot = (*b).try_into().unwrap();
 
                    Some(value.clone())
 
                } else {
 
@@ -165,8 +165,8 @@ impl Value {
 
                // It is inconsistent to read from the null message
 
                None
 
            }
 
            Value::Message(MessageValue(Some(buffer))) => {
 
                if let Some(slot) = buffer.get(the_index) {
 
            Value::Message(MessageValue(Some(payload))) => {
 
                if let Some(slot) = payload.as_slice().get(the_index) {
 
                    Some(Value::Short(ShortValue((*slot).try_into().unwrap())))
 
                } else {
 
                    // It is inconsistent to update out of bounds
 
@@ -936,27 +936,19 @@ impl ValueImpl for OutputValue {
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct MessageValue(pub Option<Vec<u8>>);
 
pub struct MessageValue(pub Option<Payload>);
 

	
 
impl Display for MessageValue {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
 
        match &self.0 {
 
            None => write!(f, "null"),
 
            Some(vec) => {
 
                write!(f, "#msg({};", vec.len())?;
 
                let mut i = 0;
 
                for v in vec.iter() {
 
                    if i > 0 {
 
                        write!(f, ",")?;
 
                    }
 
                    write!(f, "{}", v)?;
 
                    i += 1;
 
                    if i >= 10 {
 
                        write!(f, ",...")?;
 
                        break;
 
                    }
 
            Some(payload) => {
 
                // format print up to 10 bytes
 
                let mut slice = payload.as_slice();
 
                if slice.len() > 10 {
 
                    slice = &slice[..10];
 
                }
 
                write!(f, ")")
 
                f.debug_list().entries(slice.iter().copied()).finish()
 
            }
 
        }
 
    }
src/protocol/mod.rs
Show inline comments
 
@@ -228,8 +228,12 @@ impl EvalContext<'_> {
 
                let mut moved_keys = HashSet::new();
 
                for arg in args.iter() {
 
                    match arg {
 
                        Value::Output(OutputValue(key)) => { moved_keys.insert(*key); }
 
                        Value::Input(InputValue(key)) => { moved_keys.insert(*key); }
 
                        Value::Output(OutputValue(key)) => {
 
                            moved_keys.insert(*key);
 
                        }
 
                        Value::Input(InputValue(key)) => {
 
                            moved_keys.insert(*key);
 
                        }
 
                        _ => {}
 
                    }
 
                }
 
@@ -247,7 +251,7 @@ impl EvalContext<'_> {
 
                let to = Value::Input(InputValue(to));
 
                return [from, to];
 
            }
 
            EvalContext::Poly(_) => unreachable!()
 
            EvalContext::Poly(_) => unreachable!(),
 
        }
 
    }
 
    fn fires(&mut self, port: Value) -> Option<Value> {
src/runtime/connector.rs
Show inline comments
 
@@ -182,6 +182,6 @@ impl Connector {
 
        }
 
        let result = connected.controller.inner.mono_n.result.as_ref().ok_or(NoPreviousRound)?;
 
        let payload = result.1.get(&key).ok_or(DidNotGet)?;
 
        Ok(payload)
 
        Ok(payload.as_slice())
 
    }
 
}
src/runtime/ffi.rs
Show inline comments
 
@@ -258,9 +258,9 @@ pub unsafe extern "C" fn connector_put(
 
    msg_len: c_uint,
 
) -> c_int {
 
    let buf = std::slice::from_raw_parts_mut(buf_ptr, msg_len.try_into().unwrap());
 
    let payload = buf.to_vec(); // unsafe
 
    let vec: Vec<u8> = buf.to_vec(); // unsafe
 
    let mut b = Box::from_raw(connector); // unsafe!
 
    let ret = b.put(proto_port_index.try_into().unwrap(), payload);
 
    let ret = b.put(proto_port_index.try_into().unwrap(), vec.into());
 
    Box::into_raw(b); // don't drop!
 
    match ret {
 
        Ok(()) => 0,
src/runtime/serde.rs
Show inline comments
 
@@ -119,7 +119,7 @@ impl<R: Read> De<u64> for R {
 
impl<W: Write> Ser<Payload> for W {
 
    fn ser(&mut self, t: &Payload) -> Result<(), std::io::Error> {
 
        self.ser(&VarLenInt(t.len() as u64))?;
 
        for byte in t {
 
        for byte in t.as_slice() {
 
            self.ser(byte)?;
 
        }
 
        Ok(())
 
@@ -132,7 +132,7 @@ impl<R: Read> De<Payload> for R {
 
        for _ in 0..len {
 
            x.push(self.de()?);
 
        }
 
        Ok(x)
 
        Ok(x.into())
 
    }
 
}
 

	
src/test/connector.rs
Show inline comments
 
@@ -178,7 +178,7 @@ fn connector_self_forward_ok() {
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                x.get(1).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(1));
 
@@ -257,7 +257,7 @@ fn connector_self_forward_timeout() {
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            x.put(0, MSG.to_vec()).unwrap();
 
            x.put(0, MSG.to_vec().into()).unwrap();
 
            // native and forward components cannot find a solution
 
            assert_eq!(Err(SyncErr::Timeout), x.sync(timeout));
 
        },
 
@@ -282,7 +282,7 @@ fn connector_forward_det() {
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
            }
 
        },
 
@@ -319,7 +319,7 @@ fn connector_nondet_proto_det_natives() {
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _i in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
@@ -357,7 +357,7 @@ fn connector_putter_determines() {
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _i in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
@@ -398,7 +398,7 @@ fn connector_getter_determines() {
 
            x.connect(timeout).unwrap();
 
            for _i in 0..N {
 
                // batches [{0=>?}, {0=>*}]
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                x.next_batch().unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
            }
 
@@ -443,7 +443,7 @@ fn connector_alternator_2() {
 

	
 
            for _ in 0..N {
 
                for _ in 0..2 {
 
                    x.put(0, MSG.to_vec()).unwrap();
 
                    x.put(0, MSG.to_vec().into()).unwrap();
 
                    assert_eq!(0, x.sync(timeout).unwrap());
 
                }
 
            }
 
@@ -505,7 +505,7 @@ fn connector_composite_chain_a() {
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
@@ -544,7 +544,7 @@ fn connector_composite_chain_b() {
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
@@ -585,7 +585,7 @@ fn connector_exchange() {
 
            x.bind_port(3, Passive(addrs[1])).unwrap(); // peer in
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"A->B".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"A->B".to_vec().into()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"B->A" as &[u8]), x.read_gotten(1));
 
@@ -600,7 +600,7 @@ fn connector_exchange() {
 
            x.bind_port(3, Active(addrs[0])).unwrap(); // peer in
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"B->A".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"B->A".to_vec().into()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"A->B" as &[u8]), x.read_gotten(1));
 
@@ -629,8 +629,8 @@ fn connector_both() {
 
            x.bind_port(3, Passive(addrs[1])).unwrap(); // peer out b
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"one".to_vec()));
 
                assert_eq!(Ok(()), x.put(1, b"two".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"one".to_vec().into()));
 
                assert_eq!(Ok(()), x.put(1, b"two".to_vec().into()));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
            }
 
        },
 
@@ -734,7 +734,7 @@ fn connector_fifo_1_e() {
 

	
 
            for _ in 0..N {
 
                // put
 
                assert_eq!(Ok(()), x.put(0, b"message~".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"message~".to_vec().into()));
 
                assert_eq!(Ok(0), x.sync(timeout));
 

	
 
                // get
 
@@ -768,7 +768,7 @@ fn connector_causal_loop() {
 
            x.bind_port(3, Native).unwrap(); // native out
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"A->B".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"A->B".to_vec().into()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"B->A" as &[u8]), x.read_gotten(1));
 
@@ -783,7 +783,7 @@ fn connector_causal_loop() {
 
            x.bind_port(3, Native).unwrap(); // native out
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"B->A".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"B->A".to_vec().into()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"A->B" as &[u8]), x.read_gotten(1));
 
@@ -812,7 +812,7 @@ fn connector_causal_loop2() {
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"foo".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"foo".to_vec().into()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
            }
 
@@ -850,7 +850,7 @@ fn connector_recover() {
 

	
 
            for i in 0..N {
 
                if putter_does(i) {
 
                    assert_eq!(Ok(()), x.put(0, b"msg".to_vec()));
 
                    assert_eq!(Ok(()), x.put(0, b"msg".to_vec().into()));
 
                }
 
                assert_eq!(expect_res(i), x.sync(comm_timeout));
 
            }
0 comments (0 inline, 0 general)