Changeset - 02eb59c6fd66
[Not reviewed]
0 7 0
Christopher Esterhuyse - 5 years ago 2020-05-18 08:59:28
christopher.esterhuyse@gmail.com
introduced new Payload type
7 files changed with 77 insertions and 50 deletions:
0 comments (0 inline, 0 general)
src/common.rs
Show inline comments
 
@@ -21,16 +21,19 @@ pub use std::{
 
    time::Instant,
 
};
 
pub use Polarity::*;
 

	
 
///////////////////// DEFS /////////////////////
 

	
 
pub type Payload = Vec<u8>;
 
// pub type Payload = Vec<u8>;
 
pub type ControllerId = u32;
 
pub type ChannelIndex = u32;
 

	
 
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
 
pub struct Payload(Vec<u8>);
 

	
 
/// This is a unique identifier for a channel (i.e., port).
 
#[derive(Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd)]
 
pub struct ChannelId {
 
    pub(crate) controller_id: ControllerId,
 
    pub(crate) channel_index: ChannelIndex,
 
}
 
@@ -108,12 +111,40 @@ pub trait PolyContext {
 

	
 
    fn is_firing(&mut self, ekey: Key) -> Option<bool>;
 
    fn read_msg(&mut self, ekey: Key) -> Option<&Payload>;
 
}
 

	
 
///////////////////// IMPL /////////////////////
 
impl Payload {
 
    pub fn new(len: usize) -> Payload {
 
        let mut v = Vec::with_capacity(len);
 
        unsafe {
 
            v.set_len(len);
 
        }
 
        Payload(v)
 
    }
 
    pub fn len(&self) -> usize {
 
        self.0.len()
 
    }
 
    pub fn as_slice(&self) -> &[u8] {
 
        &self.0
 
    }
 
    pub fn as_mut_slice(&mut self) -> &mut [u8] {
 
        &mut self.0
 
    }
 
}
 
impl std::iter::FromIterator<u8> for Payload {
 
    fn from_iter<I: IntoIterator<Item = u8>>(it: I) -> Self {
 
        Self(it.into_iter().collect())
 
    }
 
}
 
impl From<Vec<u8>> for Payload {
 
    fn from(s: Vec<u8>) -> Self {
 
        Self(s.into())
 
    }
 
}
 
impl Debug for Port {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "Port({})", self.0)
 
    }
 
}
 
impl Key {
src/protocol/eval.rs
Show inline comments
 
@@ -45,24 +45,24 @@ pub enum Value {
 
    ByteArray(ByteArrayValue),
 
    ShortArray(ShortArrayValue),
 
    IntArray(IntArrayValue),
 
    LongArray(LongArrayValue),
 
}
 
impl Value {
 
    pub fn receive_message(buffer: &Vec<u8>) -> Value {
 
    pub fn receive_message(buffer: &Payload) -> Value {
 
        Value::Message(MessageValue(Some(buffer.clone())))
 
    }
 
    fn create_message(length: Value) -> Value {
 
        match length {
 
            Value::Byte(_) | Value::Short(_) | Value::Int(_) | Value::Long(_) => {
 
                let length: i64 = i64::from(length);
 
                if length < 0 || length > MESSAGE_MAX_LENGTH {
 
                    // Only messages within the expected length are allowed
 
                    Value::Message(MessageValue(None))
 
                } else {
 
                    Value::Message(MessageValue(Some(vec![0; length.try_into().unwrap()])))
 
                    Value::Message(MessageValue(Some(Payload::new(0))))
 
                }
 
            }
 
            _ => unimplemented!(),
 
        }
 
    }
 
    fn from_constant(constant: &Constant) -> Value {
 
@@ -105,31 +105,31 @@ impl Value {
 
        // And the value and the subject must be compatible
 
        match (self, value) {
 
            (Value::Message(MessageValue(None)), _) => {
 
                // It is inconsistent to update the null message
 
                None
 
            }
 
            (Value::Message(MessageValue(Some(buffer))), Value::Byte(ByteValue(b))) => {
 
            (Value::Message(MessageValue(Some(payload))), Value::Byte(ByteValue(b))) => {
 
                if *b < 0 {
 
                    // It is inconsistent to update with a negative value
 
                    return None;
 
                }
 
                if let Some(slot) = buffer.get_mut(the_index) {
 
                if let Some(slot) = payload.as_mut_slice().get_mut(the_index) {
 
                    *slot = (*b).try_into().unwrap();
 
                    Some(value.clone())
 
                } else {
 
                    // It is inconsistent to update out of bounds
 
                    None
 
                }
 
            }
 
            (Value::Message(MessageValue(Some(buffer))), Value::Short(ShortValue(b))) => {
 
            (Value::Message(MessageValue(Some(payload))), Value::Short(ShortValue(b))) => {
 
                if *b < 0 || *b > BYTE_MAX as i16 {
 
                    // It is inconsistent to update with a negative value or a too large value
 
                    return None;
 
                }
 
                if let Some(slot) = buffer.get_mut(the_index) {
 
                if let Some(slot) = payload.as_mut_slice().get_mut(the_index) {
 
                    *slot = (*b).try_into().unwrap();
 
                    Some(value.clone())
 
                } else {
 
                    // It is inconsistent to update out of bounds
 
                    None
 
                }
 
@@ -162,14 +162,14 @@ impl Value {
 
        // The subject must be either a message or an array
 
        match self {
 
            Value::Message(MessageValue(None)) => {
 
                // It is inconsistent to read from the null message
 
                None
 
            }
 
            Value::Message(MessageValue(Some(buffer))) => {
 
                if let Some(slot) = buffer.get(the_index) {
 
            Value::Message(MessageValue(Some(payload))) => {
 
                if let Some(slot) = payload.as_slice().get(the_index) {
 
                    Some(Value::Short(ShortValue((*slot).try_into().unwrap())))
 
                } else {
 
                    // It is inconsistent to update out of bounds
 
                    None
 
                }
 
            }
 
@@ -933,33 +933,25 @@ impl ValueImpl for OutputValue {
 
            _ => false,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct MessageValue(pub Option<Vec<u8>>);
 
pub struct MessageValue(pub Option<Payload>);
 

	
 
impl Display for MessageValue {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
 
        match &self.0 {
 
            None => write!(f, "null"),
 
            Some(vec) => {
 
                write!(f, "#msg({};", vec.len())?;
 
                let mut i = 0;
 
                for v in vec.iter() {
 
                    if i > 0 {
 
                        write!(f, ",")?;
 
                    }
 
                    write!(f, "{}", v)?;
 
                    i += 1;
 
                    if i >= 10 {
 
                        write!(f, ",...")?;
 
                        break;
 
                    }
 
            Some(payload) => {
 
                // format print up to 10 bytes
 
                let mut slice = payload.as_slice();
 
                if slice.len() > 10 {
 
                    slice = &slice[..10];
 
                }
 
                write!(f, ")")
 
                f.debug_list().entries(slice.iter().copied()).finish()
 
            }
 
        }
 
    }
 
}
 

	
 
impl ValueImpl for MessageValue {
src/protocol/mod.rs
Show inline comments
 
@@ -225,14 +225,18 @@ impl EvalContext<'_> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(context) => {
 
                let mut moved_keys = HashSet::new();
 
                for arg in args.iter() {
 
                    match arg {
 
                        Value::Output(OutputValue(key)) => { moved_keys.insert(*key); }
 
                        Value::Input(InputValue(key)) => { moved_keys.insert(*key); }
 
                        Value::Output(OutputValue(key)) => {
 
                            moved_keys.insert(*key);
 
                        }
 
                        Value::Input(InputValue(key)) => {
 
                            moved_keys.insert(*key);
 
                        }
 
                        _ => {}
 
                    }
 
                }
 
                context.new_component(moved_keys, init_state)
 
            }
 
            EvalContext::Poly(_) => unreachable!(),
 
@@ -244,13 +248,13 @@ impl EvalContext<'_> {
 
            EvalContext::Mono(context) => {
 
                let [from, to] = context.new_channel();
 
                let from = Value::Output(OutputValue(from));
 
                let to = Value::Input(InputValue(to));
 
                return [from, to];
 
            }
 
            EvalContext::Poly(_) => unreachable!()
 
            EvalContext::Poly(_) => unreachable!(),
 
        }
 
    }
 
    fn fires(&mut self, port: Value) -> Option<Value> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(_) => unreachable!(),
src/runtime/connector.rs
Show inline comments
 
@@ -179,9 +179,9 @@ impl Connector {
 
            connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?;
 
        if polarity != Getter {
 
            return Err(WrongPolarity);
 
        }
 
        let result = connected.controller.inner.mono_n.result.as_ref().ok_or(NoPreviousRound)?;
 
        let payload = result.1.get(&key).ok_or(DidNotGet)?;
 
        Ok(payload)
 
        Ok(payload.as_slice())
 
    }
 
}
src/runtime/ffi.rs
Show inline comments
 
@@ -255,15 +255,15 @@ pub unsafe extern "C" fn connector_put(
 
    connector: *mut Connector,
 
    proto_port_index: c_uint,
 
    buf_ptr: *mut c_uchar,
 
    msg_len: c_uint,
 
) -> c_int {
 
    let buf = std::slice::from_raw_parts_mut(buf_ptr, msg_len.try_into().unwrap());
 
    let payload = buf.to_vec(); // unsafe
 
    let vec: Vec<u8> = buf.to_vec(); // unsafe
 
    let mut b = Box::from_raw(connector); // unsafe!
 
    let ret = b.put(proto_port_index.try_into().unwrap(), payload);
 
    let ret = b.put(proto_port_index.try_into().unwrap(), vec.into());
 
    Box::into_raw(b); // don't drop!
 
    match ret {
 
        Ok(()) => 0,
 
        Err(e) => {
 
            overwrite_last_error(format!("{:?}", e).as_bytes());
 
            -1
src/runtime/serde.rs
Show inline comments
 
@@ -116,26 +116,26 @@ impl<R: Read> De<u64> for R {
 
    }
 
}
 

	
 
impl<W: Write> Ser<Payload> for W {
 
    fn ser(&mut self, t: &Payload) -> Result<(), std::io::Error> {
 
        self.ser(&VarLenInt(t.len() as u64))?;
 
        for byte in t {
 
        for byte in t.as_slice() {
 
            self.ser(byte)?;
 
        }
 
        Ok(())
 
    }
 
}
 
impl<R: Read> De<Payload> for R {
 
    fn de(&mut self) -> Result<Payload, std::io::Error> {
 
        let VarLenInt(len) = self.de()?;
 
        let mut x = Vec::with_capacity(len as usize);
 
        for _ in 0..len {
 
            x.push(self.de()?);
 
        }
 
        Ok(x)
 
        Ok(x.into())
 
    }
 
}
 

	
 
impl<W: Write> Ser<VarLenInt> for W {
 
    fn ser(&mut self, t: &VarLenInt) -> Result<(), std::io::Error> {
 
        integer_encoding::VarIntWriter::write_varint(self, t.0).map(|_| ())
src/test/connector.rs
Show inline comments
 
@@ -175,13 +175,13 @@ fn connector_self_forward_ok() {
 
            // Alice
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                x.get(1).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(1));
 
            }
 
        },
 
    ]));
 
@@ -254,13 +254,13 @@ fn connector_self_forward_timeout() {
 
        &|x| {
 
            // Sender
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            x.put(0, MSG.to_vec()).unwrap();
 
            x.put(0, MSG.to_vec().into()).unwrap();
 
            // native and forward components cannot find a solution
 
            assert_eq!(Err(SyncErr::Timeout), x.sync(timeout));
 
        },
 
    ]));
 
}
 

	
 
@@ -279,13 +279,13 @@ fn connector_forward_det() {
 
        &|x| {
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
            }
 
        },
 
        &|x| {
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
@@ -316,13 +316,13 @@ fn connector_nondet_proto_det_natives() {
 
            // Alice
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _i in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"sync").unwrap();
 
@@ -354,13 +354,13 @@ fn connector_putter_determines() {
 
            // Alice
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _i in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"sync").unwrap();
 
@@ -395,13 +395,13 @@ fn connector_getter_determines() {
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _i in 0..N {
 
                // batches [{0=>?}, {0=>*}]
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                x.next_batch().unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
@@ -440,13 +440,13 @@ fn connector_alternator_2() {
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.bind_port(2, Passive(addrs[1])).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                for _ in 0..2 {
 
                    x.put(0, MSG.to_vec()).unwrap();
 
                    x.put(0, MSG.to_vec().into()).unwrap();
 
                    assert_eq!(0, x.sync(timeout).unwrap());
 
                }
 
            }
 
        },
 
        &|x| {
 
            // A
 
@@ -502,13 +502,13 @@ fn connector_composite_chain_a() {
 
            // Alice
 
            x.configure(PDL, b"sync_2").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"forward").unwrap();
 
@@ -541,13 +541,13 @@ fn connector_composite_chain_b() {
 
            // Alice
 
            x.configure(PDL, b"sync_2").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"sync_2").unwrap();
 
@@ -582,13 +582,13 @@ fn connector_exchange() {
 
            x.bind_port(0, Native).unwrap(); // native in
 
            x.bind_port(1, Native).unwrap(); // native out
 
            x.bind_port(2, Passive(addrs[0])).unwrap(); // peer out
 
            x.bind_port(3, Passive(addrs[1])).unwrap(); // peer in
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"A->B".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"A->B".to_vec().into()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"B->A" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
        &|x| {
 
@@ -597,13 +597,13 @@ fn connector_exchange() {
 
            x.bind_port(0, Native).unwrap(); // native in
 
            x.bind_port(1, Native).unwrap(); // native out
 
            x.bind_port(2, Active(addrs[1])).unwrap(); // peer out
 
            x.bind_port(3, Active(addrs[0])).unwrap(); // peer in
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"B->A".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"B->A".to_vec().into()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"A->B" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
    ]));
 
@@ -626,14 +626,14 @@ fn connector_both() {
 
            x.bind_port(0, Native).unwrap(); // native in a
 
            x.bind_port(1, Passive(addrs[0])).unwrap(); // peer out a
 
            x.bind_port(2, Native).unwrap(); // native in b
 
            x.bind_port(3, Passive(addrs[1])).unwrap(); // peer out b
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"one".to_vec()));
 
                assert_eq!(Ok(()), x.put(1, b"two".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"one".to_vec().into()));
 
                assert_eq!(Ok(()), x.put(1, b"two".to_vec().into()));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
            }
 
        },
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"exchange").unwrap();
 
@@ -731,13 +731,13 @@ fn connector_fifo_1_e() {
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                // put
 
                assert_eq!(Ok(()), x.put(0, b"message~".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"message~".to_vec().into()));
 
                assert_eq!(Ok(0), x.sync(timeout));
 

	
 
                // get
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"message~" as &[u8]), x.read_gotten(1));
 
@@ -765,13 +765,13 @@ fn connector_causal_loop() {
 
            x.bind_port(0, Passive(addrs[0])).unwrap(); // peer out
 
            x.bind_port(1, Passive(addrs[1])).unwrap(); // peer in
 
            x.bind_port(2, Native).unwrap(); // native in
 
            x.bind_port(3, Native).unwrap(); // native out
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"A->B".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"A->B".to_vec().into()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"B->A" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
        &|x| {
 
@@ -780,13 +780,13 @@ fn connector_causal_loop() {
 
            x.bind_port(0, Active(addrs[1])).unwrap(); // peer out
 
            x.bind_port(1, Active(addrs[0])).unwrap(); // peer in
 
            x.bind_port(2, Native).unwrap(); // native in
 
            x.bind_port(3, Native).unwrap(); // native out
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"B->A".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"B->A".to_vec().into()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"A->B" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
    ]));
 
@@ -809,13 +809,13 @@ fn connector_causal_loop2() {
 
            // Alice
 
            x.configure(PDL, b"samelen_repl").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"foo".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"foo".to_vec().into()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
            }
 
        },
 
    ]));
 
}
 
@@ -847,13 +847,13 @@ fn connector_recover() {
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(connect_timeout).unwrap();
 

	
 
            for i in 0..N {
 
                if putter_does(i) {
 
                    assert_eq!(Ok(()), x.put(0, b"msg".to_vec()));
 
                    assert_eq!(Ok(()), x.put(0, b"msg".to_vec().into()));
 
                }
 
                assert_eq!(expect_res(i), x.sync(comm_timeout));
 
            }
 
        },
 
        &|x| {
 
            // Bob
0 comments (0 inline, 0 general)