Changeset - 02eb59c6fd66
[Not reviewed]
0 7 0
Christopher Esterhuyse - 5 years ago 2020-05-18 08:59:28
christopher.esterhuyse@gmail.com
introduced new Payload type
7 files changed with 77 insertions and 50 deletions:
0 comments (0 inline, 0 general)
src/common.rs
Show inline comments
 
///////////////////// PRELUDE /////////////////////
 

	
 
pub use core::{
 
    cmp::Ordering,
 
    fmt::{Debug, Formatter},
 
    hash::{Hash, Hasher},
 
    ops::{Range, RangeFrom},
 
    time::Duration,
 
};
 
pub use indexmap::{IndexMap, IndexSet};
 
pub use maplit::{hashmap, hashset};
 
pub use mio::{
 
    net::{TcpListener, TcpStream},
 
    Event, Evented, Events, Poll, PollOpt, Ready, Token,
 
};
 
pub use std::{
 
    collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
 
    convert::TryInto,
 
    net::SocketAddr,
 
    sync::Arc,
 
    time::Instant,
 
};
 
pub use Polarity::*;
 

	
 
///////////////////// DEFS /////////////////////
 

	
 
pub type Payload = Vec<u8>;
 
// pub type Payload = Vec<u8>;
 
pub type ControllerId = u32;
 
pub type ChannelIndex = u32;
 

	
 
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
 
pub struct Payload(Vec<u8>);
 

	
 
/// This is a unique identifier for a channel (i.e., port).
 
#[derive(Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd)]
 
pub struct ChannelId {
 
    pub(crate) controller_id: ControllerId,
 
    pub(crate) channel_index: ChannelIndex,
 
}
 

	
 
#[derive(Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd)]
 
pub enum Polarity {
 
    Putter, // output port (from the perspective of the component)
 
    Getter, // input port (from the perspective of the component)
 
}
 

	
 
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Copy, Clone)]
 
#[repr(C)]
 
pub struct Port(pub usize); // ports are COPY
 
pub type Key = Port;
 

	
 
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
 
pub enum MainComponentErr {
 
    NoSuchComponent,
 
    NonPortTypeParameters,
 
    CannotMovePort(Port),
 
    WrongNumberOfParamaters { expected: usize },
 
    UnknownPort(Port),
 
    WrongPortPolarity { param_index: usize, port: Port },
 
    DuplicateMovedPort(Port),
 
}
 
pub trait ProtocolDescription: Sized {
 
    type S: ComponentState<D = Self>;
 

	
 
    fn parse(pdl: &[u8]) -> Result<Self, String>;
 
    fn component_polarities(&self, identifier: &[u8]) -> Result<Vec<Polarity>, MainComponentErr>;
 
    fn new_main_component(&self, identifier: &[u8], ports: &[Key]) -> Self::S;
 
}
 

	
 
pub trait ComponentState: Sized + Clone {
 
    type D: ProtocolDescription;
 
    fn pre_sync_run<C: MonoContext<D = Self::D, S = Self>>(
 
        &mut self,
 
        runtime_ctx: &mut C,
 
        protocol_description: &Self::D,
 
    ) -> MonoBlocker;
 

	
 
    fn sync_run<C: PolyContext<D = Self::D>>(
 
        &mut self,
 
        runtime_ctx: &mut C,
 
        protocol_description: &Self::D,
 
    ) -> PolyBlocker;
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum MonoBlocker {
 
    Inconsistent,
 
    ComponentExit,
 
    SyncBlockStart,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum PolyBlocker {
 
    Inconsistent,
 
    SyncBlockEnd,
 
    CouldntReadMsg(Key),
 
    CouldntCheckFiring(Key),
 
    PutMsg(Key, Payload),
 
}
 

	
 
pub trait MonoContext {
 
    type D: ProtocolDescription;
 
    type S: ComponentState<D = Self::D>;
 

	
 
    fn new_component(&mut self, moved_keys: HashSet<Key>, init_state: Self::S);
 
    fn new_channel(&mut self) -> [Key; 2];
 
    fn new_random(&mut self) -> u64;
 
}
 
pub trait PolyContext {
 
    type D: ProtocolDescription;
 

	
 
    fn is_firing(&mut self, ekey: Key) -> Option<bool>;
 
    fn read_msg(&mut self, ekey: Key) -> Option<&Payload>;
 
}
 

	
 
///////////////////// IMPL /////////////////////
 
impl Payload {
 
    pub fn new(len: usize) -> Payload {
 
        let mut v = Vec::with_capacity(len);
 
        unsafe {
 
            v.set_len(len);
 
        }
 
        Payload(v)
 
    }
 
    pub fn len(&self) -> usize {
 
        self.0.len()
 
    }
 
    pub fn as_slice(&self) -> &[u8] {
 
        &self.0
 
    }
 
    pub fn as_mut_slice(&mut self) -> &mut [u8] {
 
        &mut self.0
 
    }
 
}
 
impl std::iter::FromIterator<u8> for Payload {
 
    fn from_iter<I: IntoIterator<Item = u8>>(it: I) -> Self {
 
        Self(it.into_iter().collect())
 
    }
 
}
 
impl From<Vec<u8>> for Payload {
 
    fn from(s: Vec<u8>) -> Self {
 
        Self(s.into())
 
    }
 
}
 
impl Debug for Port {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "Port({})", self.0)
 
    }
 
}
 
impl Key {
 
    pub fn from_raw(raw: usize) -> Self {
 
        Self(raw)
 
    }
 
    pub fn to_raw(self) -> usize {
 
        self.0
 
    }
 
    pub fn to_token(self) -> mio::Token {
 
        mio::Token(self.0.try_into().unwrap())
 
    }
 
    pub fn from_token(t: mio::Token) -> Self {
 
        Self(t.0.try_into().unwrap())
 
    }
 
}
src/protocol/eval.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::fmt;
 
use std::fmt::{Debug, Display, Formatter};
 
use std::{i16, i32, i64, i8};
 

	
 
use crate::common::*;
 

	
 
use crate::protocol::ast::*;
 
use crate::protocol::inputsource::*;
 
use crate::protocol::parser::*;
 
use crate::protocol::EvalContext;
 

	
 
const MAX_RECURSION: usize = 1024;
 

	
 
const BYTE_MIN: i64 = i8::MIN as i64;
 
const BYTE_MAX: i64 = i8::MAX as i64;
 
const SHORT_MIN: i64 = i16::MIN as i64;
 
const SHORT_MAX: i64 = i16::MAX as i64;
 
const INT_MIN: i64 = i32::MIN as i64;
 
const INT_MAX: i64 = i32::MAX as i64;
 

	
 
const MESSAGE_MAX_LENGTH: i64 = SHORT_MAX;
 

	
 
const ONE: Value = Value::Byte(ByteValue(1));
 

	
 
trait ValueImpl {
 
    fn exact_type(&self) -> Type;
 
    fn is_type_compatible(&self, t: &Type) -> bool;
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum Value {
 
    Input(InputValue),
 
    Output(OutputValue),
 
    Message(MessageValue),
 
    Boolean(BooleanValue),
 
    Byte(ByteValue),
 
    Short(ShortValue),
 
    Int(IntValue),
 
    Long(LongValue),
 
    InputArray(InputArrayValue),
 
    OutputArray(OutputArrayValue),
 
    MessageArray(MessageArrayValue),
 
    BooleanArray(BooleanArrayValue),
 
    ByteArray(ByteArrayValue),
 
    ShortArray(ShortArrayValue),
 
    IntArray(IntArrayValue),
 
    LongArray(LongArrayValue),
 
}
 
impl Value {
 
    pub fn receive_message(buffer: &Vec<u8>) -> Value {
 
    pub fn receive_message(buffer: &Payload) -> Value {
 
        Value::Message(MessageValue(Some(buffer.clone())))
 
    }
 
    fn create_message(length: Value) -> Value {
 
        match length {
 
            Value::Byte(_) | Value::Short(_) | Value::Int(_) | Value::Long(_) => {
 
                let length: i64 = i64::from(length);
 
                if length < 0 || length > MESSAGE_MAX_LENGTH {
 
                    // Only messages within the expected length are allowed
 
                    Value::Message(MessageValue(None))
 
                } else {
 
                    Value::Message(MessageValue(Some(vec![0; length.try_into().unwrap()])))
 
                    Value::Message(MessageValue(Some(Payload::new(0))))
 
                }
 
            }
 
            _ => unimplemented!(),
 
        }
 
    }
 
    fn from_constant(constant: &Constant) -> Value {
 
        match constant {
 
            Constant::Null => Value::Message(MessageValue(None)),
 
            Constant::True => Value::Boolean(BooleanValue(true)),
 
            Constant::False => Value::Boolean(BooleanValue(false)),
 
            Constant::Integer(data) => {
 
                // Convert raw ASCII data to UTF-8 string
 
                let raw = String::from_utf8_lossy(data);
 
                let val = raw.parse::<i64>().unwrap();
 
                if val >= BYTE_MIN && val <= BYTE_MAX {
 
                    Value::Byte(ByteValue(val as i8))
 
                } else if val >= SHORT_MIN && val <= SHORT_MAX {
 
                    Value::Short(ShortValue(val as i16))
 
                } else if val >= INT_MIN && val <= INT_MAX {
 
                    Value::Int(IntValue(val as i32))
 
                } else {
 
                    Value::Long(LongValue(val))
 
                }
 
            }
 
            Constant::Character(data) => unimplemented!(),
 
        }
 
    }
 
    fn set(&mut self, index: &Value, value: &Value) -> Option<Value> {
 
        // The index must be of integer type, and non-negative
 
        let the_index: usize;
 
        match index {
 
            Value::Byte(_) | Value::Short(_) | Value::Int(_) | Value::Long(_) => {
 
                let index = i64::from(index);
 
                if index < 0 || index >= MESSAGE_MAX_LENGTH {
 
                    // It is inconsistent to update out of bounds
 
                    return None;
 
                }
 
                the_index = index.try_into().unwrap();
 
            }
 
            _ => unreachable!(),
 
        }
 
        // The subject must be either a message or an array
 
        // And the value and the subject must be compatible
 
        match (self, value) {
 
            (Value::Message(MessageValue(None)), _) => {
 
                // It is inconsistent to update the null message
 
                None
 
            }
 
            (Value::Message(MessageValue(Some(buffer))), Value::Byte(ByteValue(b))) => {
 
            (Value::Message(MessageValue(Some(payload))), Value::Byte(ByteValue(b))) => {
 
                if *b < 0 {
 
                    // It is inconsistent to update with a negative value
 
                    return None;
 
                }
 
                if let Some(slot) = buffer.get_mut(the_index) {
 
                if let Some(slot) = payload.as_mut_slice().get_mut(the_index) {
 
                    *slot = (*b).try_into().unwrap();
 
                    Some(value.clone())
 
                } else {
 
                    // It is inconsistent to update out of bounds
 
                    None
 
                }
 
            }
 
            (Value::Message(MessageValue(Some(buffer))), Value::Short(ShortValue(b))) => {
 
            (Value::Message(MessageValue(Some(payload))), Value::Short(ShortValue(b))) => {
 
                if *b < 0 || *b > BYTE_MAX as i16 {
 
                    // It is inconsistent to update with a negative value or a too large value
 
                    return None;
 
                }
 
                if let Some(slot) = buffer.get_mut(the_index) {
 
                if let Some(slot) = payload.as_mut_slice().get_mut(the_index) {
 
                    *slot = (*b).try_into().unwrap();
 
                    Some(value.clone())
 
                } else {
 
                    // It is inconsistent to update out of bounds
 
                    None
 
                }
 
            }
 
            (Value::InputArray(_), Value::Input(_)) => todo!(),
 
            (Value::OutputArray(_), Value::Output(_)) => todo!(),
 
            (Value::MessageArray(_), Value::Message(_)) => todo!(),
 
            (Value::BooleanArray(_), Value::Boolean(_)) => todo!(),
 
            (Value::ByteArray(_), Value::Byte(_)) => todo!(),
 
            (Value::ShortArray(_), Value::Short(_)) => todo!(),
 
            (Value::IntArray(_), Value::Int(_)) => todo!(),
 
            (Value::LongArray(_), Value::Long(_)) => todo!(),
 
            _ => unreachable!(),
 
        }
 
    }
 
    fn get(&self, index: &Value) -> Option<Value> {
 
        // The index must be of integer type, and non-negative
 
        let the_index: usize;
 
        match index {
 
            Value::Byte(_) | Value::Short(_) | Value::Int(_) | Value::Long(_) => {
 
                let index = i64::from(index);
 
                if index < 0 || index >= MESSAGE_MAX_LENGTH {
 
                    // It is inconsistent to update out of bounds
 
                    return None;
 
                }
 
                the_index = index.try_into().unwrap();
 
            }
 
            _ => unreachable!(),
 
        }
 
        // The subject must be either a message or an array
 
        match self {
 
            Value::Message(MessageValue(None)) => {
 
                // It is inconsistent to read from the null message
 
                None
 
            }
 
            Value::Message(MessageValue(Some(buffer))) => {
 
                if let Some(slot) = buffer.get(the_index) {
 
            Value::Message(MessageValue(Some(payload))) => {
 
                if let Some(slot) = payload.as_slice().get(the_index) {
 
                    Some(Value::Short(ShortValue((*slot).try_into().unwrap())))
 
                } else {
 
                    // It is inconsistent to update out of bounds
 
                    None
 
                }
 
            }
 
            Value::InputArray(_) => todo!(),
 
            Value::OutputArray(_) => todo!(),
 
            Value::MessageArray(_) => todo!(),
 
            Value::BooleanArray(_) => todo!(),
 
            Value::ByteArray(_) => todo!(),
 
            Value::ShortArray(_) => todo!(),
 
            Value::IntArray(_) => todo!(),
 
            Value::LongArray(_) => todo!(),
 
            _ => unreachable!(),
 
        }
 
    }
 
    fn length(&self) -> Option<Value> {
 
        // The subject must be either a message or an array
 
        match self {
 
            Value::Message(MessageValue(None)) => {
 
                // It is inconsistent to get length from the null message
 
                None
 
            }
 
            Value::Message(MessageValue(Some(buffer))) => {
 
                Some(Value::Int(IntValue((buffer.len()).try_into().unwrap())))
 
            }
 
            Value::InputArray(InputArrayValue(vec)) => {
 
                Some(Value::Int(IntValue((vec.len()).try_into().unwrap())))
 
            }
 
            Value::OutputArray(OutputArrayValue(vec)) => {
 
                Some(Value::Int(IntValue((vec.len()).try_into().unwrap())))
 
            }
 
            Value::MessageArray(MessageArrayValue(vec)) => {
 
                Some(Value::Int(IntValue((vec.len()).try_into().unwrap())))
 
            }
 
            Value::BooleanArray(BooleanArrayValue(vec)) => {
 
                Some(Value::Int(IntValue((vec.len()).try_into().unwrap())))
 
            }
 
            Value::ByteArray(ByteArrayValue(vec)) => {
 
                Some(Value::Int(IntValue((vec.len()).try_into().unwrap())))
 
            }
 
            Value::ShortArray(ShortArrayValue(vec)) => {
 
                Some(Value::Int(IntValue((vec.len()).try_into().unwrap())))
 
            }
 
            Value::IntArray(IntArrayValue(vec)) => {
 
                Some(Value::Int(IntValue((vec.len()).try_into().unwrap())))
 
            }
 
            Value::LongArray(LongArrayValue(vec)) => {
 
                Some(Value::Int(IntValue((vec.len()).try_into().unwrap())))
 
            }
 
            _ => unreachable!(),
 
        }
 
    }
 
    fn plus(&self, other: &Value) -> Value {
 
        match (self, other) {
 
            (Value::Byte(ByteValue(s)), Value::Byte(ByteValue(o))) => {
 
                Value::Byte(ByteValue(*s + *o))
 
            }
 
            (Value::Byte(ByteValue(s)), Value::Short(ShortValue(o))) => {
 
                Value::Short(ShortValue(*s as i16 + *o))
 
            }
 
            (Value::Byte(ByteValue(s)), Value::Int(IntValue(o))) => {
 
                Value::Int(IntValue(*s as i32 + *o))
 
            }
 
            (Value::Byte(ByteValue(s)), Value::Long(LongValue(o))) => {
 
                Value::Long(LongValue(*s as i64 + *o))
 
            }
 
            (Value::Short(ShortValue(s)), Value::Byte(ByteValue(o))) => {
 
                Value::Short(ShortValue(*s + *o as i16))
 
            }
 
            (Value::Short(ShortValue(s)), Value::Short(ShortValue(o))) => {
 
                Value::Short(ShortValue(*s + *o))
 
            }
 
            (Value::Short(ShortValue(s)), Value::Int(IntValue(o))) => {
 
                Value::Int(IntValue(*s as i32 + *o))
 
            }
 
            (Value::Short(ShortValue(s)), Value::Long(LongValue(o))) => {
 
                Value::Long(LongValue(*s as i64 + *o))
 
            }
 
            (Value::Int(IntValue(s)), Value::Byte(ByteValue(o))) => {
 
                Value::Int(IntValue(*s + *o as i32))
 
            }
 
            (Value::Int(IntValue(s)), Value::Short(ShortValue(o))) => {
 
                Value::Int(IntValue(*s + *o as i32))
 
            }
 
            (Value::Int(IntValue(s)), Value::Int(IntValue(o))) => Value::Int(IntValue(*s + *o)),
 
            (Value::Int(IntValue(s)), Value::Long(LongValue(o))) => {
 
                Value::Long(LongValue(*s as i64 + *o))
 
            }
 
            (Value::Long(LongValue(s)), Value::Byte(ByteValue(o))) => {
 
                Value::Long(LongValue(*s + *o as i64))
 
            }
 
            (Value::Long(LongValue(s)), Value::Short(ShortValue(o))) => {
 
                Value::Long(LongValue(*s + *o as i64))
 
            }
 
            (Value::Long(LongValue(s)), Value::Int(IntValue(o))) => {
 
                Value::Long(LongValue(*s + *o as i64))
 
            }
 
            (Value::Long(LongValue(s)), Value::Long(LongValue(o))) => {
 
                Value::Long(LongValue(*s + *o))
 
            }
 
            _ => unimplemented!(),
 
        }
 
    }
 
    fn minus(&self, other: &Value) -> Value {
 
        match (self, other) {
 
            (Value::Byte(ByteValue(s)), Value::Byte(ByteValue(o))) => {
 
                Value::Byte(ByteValue(*s - *o))
 
            }
 
            (Value::Byte(ByteValue(s)), Value::Short(ShortValue(o))) => {
 
                Value::Short(ShortValue(*s as i16 - *o))
 
            }
 
            (Value::Byte(ByteValue(s)), Value::Int(IntValue(o))) => {
 
                Value::Int(IntValue(*s as i32 - *o))
 
            }
 
            (Value::Byte(ByteValue(s)), Value::Long(LongValue(o))) => {
 
                Value::Long(LongValue(*s as i64 - *o))
 
            }
 
            (Value::Short(ShortValue(s)), Value::Byte(ByteValue(o))) => {
 
                Value::Short(ShortValue(*s - *o as i16))
 
            }
 
            (Value::Short(ShortValue(s)), Value::Short(ShortValue(o))) => {
 
                Value::Short(ShortValue(*s - *o))
 
            }
 
            (Value::Short(ShortValue(s)), Value::Int(IntValue(o))) => {
 
                Value::Int(IntValue(*s as i32 - *o))
 
            }
 
            (Value::Short(ShortValue(s)), Value::Long(LongValue(o))) => {
 
                Value::Long(LongValue(*s as i64 - *o))
 
            }
 
            (Value::Int(IntValue(s)), Value::Byte(ByteValue(o))) => {
 
                Value::Int(IntValue(*s - *o as i32))
 
            }
 
            (Value::Int(IntValue(s)), Value::Short(ShortValue(o))) => {
 
                Value::Int(IntValue(*s - *o as i32))
 
            }
 
            (Value::Int(IntValue(s)), Value::Int(IntValue(o))) => Value::Int(IntValue(*s - *o)),
 
            (Value::Int(IntValue(s)), Value::Long(LongValue(o))) => {
 
                Value::Long(LongValue(*s as i64 - *o))
 
            }
 
            (Value::Long(LongValue(s)), Value::Byte(ByteValue(o))) => {
 
                Value::Long(LongValue(*s - *o as i64))
 
            }
 
            (Value::Long(LongValue(s)), Value::Short(ShortValue(o))) => {
 
                Value::Long(LongValue(*s - *o as i64))
 
            }
 
            (Value::Long(LongValue(s)), Value::Int(IntValue(o))) => {
 
                Value::Long(LongValue(*s - *o as i64))
 
            }
 
            (Value::Long(LongValue(s)), Value::Long(LongValue(o))) => {
 
                Value::Long(LongValue(*s - *o))
 
            }
 
            _ => unimplemented!(),
 
        }
 
    }
 
    fn modulus(&self, other: &Value) -> Value {
 
        match (self, other) {
 
            (Value::Byte(ByteValue(s)), Value::Byte(ByteValue(o))) => {
 
                Value::Byte(ByteValue(*s % *o))
 
            }
 
            (Value::Byte(ByteValue(s)), Value::Short(ShortValue(o))) => {
 
                Value::Short(ShortValue(*s as i16 % *o))
 
            }
 
            (Value::Byte(ByteValue(s)), Value::Int(IntValue(o))) => {
 
                Value::Int(IntValue(*s as i32 % *o))
 
            }
 
            (Value::Byte(ByteValue(s)), Value::Long(LongValue(o))) => {
 
                Value::Long(LongValue(*s as i64 % *o))
 
            }
 
            (Value::Short(ShortValue(s)), Value::Byte(ByteValue(o))) => {
 
                Value::Short(ShortValue(*s % *o as i16))
 
            }
 
            (Value::Short(ShortValue(s)), Value::Short(ShortValue(o))) => {
 
                Value::Short(ShortValue(*s % *o))
 
            }
 
            (Value::Short(ShortValue(s)), Value::Int(IntValue(o))) => {
 
                Value::Int(IntValue(*s as i32 % *o))
 
            }
 
            (Value::Short(ShortValue(s)), Value::Long(LongValue(o))) => {
 
                Value::Long(LongValue(*s as i64 % *o))
 
            }
 
            (Value::Int(IntValue(s)), Value::Byte(ByteValue(o))) => {
 
                Value::Int(IntValue(*s % *o as i32))
 
            }
 
            (Value::Int(IntValue(s)), Value::Short(ShortValue(o))) => {
 
                Value::Int(IntValue(*s % *o as i32))
 
            }
 
            (Value::Int(IntValue(s)), Value::Int(IntValue(o))) => Value::Int(IntValue(*s % *o)),
 
            (Value::Int(IntValue(s)), Value::Long(LongValue(o))) => {
 
                Value::Long(LongValue(*s as i64 % *o))
 
            }
 
@@ -747,405 +747,397 @@ impl From<Value> for i8 {
 
}
 
impl From<&Value> for i8 {
 
    fn from(val: &Value) -> Self {
 
        match val {
 
            Value::Byte(ByteValue(b)) => *b,
 
            _ => unimplemented!(),
 
        }
 
    }
 
}
 

	
 
impl From<Value> for i16 {
 
    fn from(val: Value) -> Self {
 
        match val {
 
            Value::Byte(ByteValue(b)) => i16::from(b),
 
            Value::Short(ShortValue(s)) => s,
 
            _ => unimplemented!(),
 
        }
 
    }
 
}
 
impl From<&Value> for i16 {
 
    fn from(val: &Value) -> Self {
 
        match val {
 
            Value::Byte(ByteValue(b)) => i16::from(*b),
 
            Value::Short(ShortValue(s)) => *s,
 
            _ => unimplemented!(),
 
        }
 
    }
 
}
 

	
 
impl From<Value> for i32 {
 
    fn from(val: Value) -> Self {
 
        match val {
 
            Value::Byte(ByteValue(b)) => i32::from(b),
 
            Value::Short(ShortValue(s)) => i32::from(s),
 
            Value::Int(IntValue(i)) => i,
 
            _ => unimplemented!(),
 
        }
 
    }
 
}
 
impl From<&Value> for i32 {
 
    fn from(val: &Value) -> Self {
 
        match val {
 
            Value::Byte(ByteValue(b)) => i32::from(*b),
 
            Value::Short(ShortValue(s)) => i32::from(*s),
 
            Value::Int(IntValue(i)) => *i,
 
            _ => unimplemented!(),
 
        }
 
    }
 
}
 

	
 
impl From<Value> for i64 {
 
    fn from(val: Value) -> Self {
 
        match val {
 
            Value::Byte(ByteValue(b)) => i64::from(b),
 
            Value::Short(ShortValue(s)) => i64::from(s),
 
            Value::Int(IntValue(i)) => i64::from(i),
 
            Value::Long(LongValue(l)) => l,
 
            _ => unimplemented!(),
 
        }
 
    }
 
}
 
impl From<&Value> for i64 {
 
    fn from(val: &Value) -> Self {
 
        match val {
 
            Value::Byte(ByteValue(b)) => i64::from(*b),
 
            Value::Short(ShortValue(s)) => i64::from(*s),
 
            Value::Int(IntValue(i)) => i64::from(*i),
 
            Value::Long(LongValue(l)) => *l,
 
            _ => unimplemented!(),
 
        }
 
    }
 
}
 

	
 
impl ValueImpl for Value {
 
    fn exact_type(&self) -> Type {
 
        match self {
 
            Value::Input(val) => val.exact_type(),
 
            Value::Output(val) => val.exact_type(),
 
            Value::Message(val) => val.exact_type(),
 
            Value::Boolean(val) => val.exact_type(),
 
            Value::Byte(val) => val.exact_type(),
 
            Value::Short(val) => val.exact_type(),
 
            Value::Int(val) => val.exact_type(),
 
            Value::Long(val) => val.exact_type(),
 
            Value::InputArray(val) => val.exact_type(),
 
            Value::OutputArray(val) => val.exact_type(),
 
            Value::MessageArray(val) => val.exact_type(),
 
            Value::BooleanArray(val) => val.exact_type(),
 
            Value::ByteArray(val) => val.exact_type(),
 
            Value::ShortArray(val) => val.exact_type(),
 
            Value::IntArray(val) => val.exact_type(),
 
            Value::LongArray(val) => val.exact_type(),
 
        }
 
    }
 
    fn is_type_compatible(&self, t: &Type) -> bool {
 
        match self {
 
            Value::Input(val) => val.is_type_compatible(t),
 
            Value::Output(val) => val.is_type_compatible(t),
 
            Value::Message(val) => val.is_type_compatible(t),
 
            Value::Boolean(val) => val.is_type_compatible(t),
 
            Value::Byte(val) => val.is_type_compatible(t),
 
            Value::Short(val) => val.is_type_compatible(t),
 
            Value::Int(val) => val.is_type_compatible(t),
 
            Value::Long(val) => val.is_type_compatible(t),
 
            Value::InputArray(val) => val.is_type_compatible(t),
 
            Value::OutputArray(val) => val.is_type_compatible(t),
 
            Value::MessageArray(val) => val.is_type_compatible(t),
 
            Value::BooleanArray(val) => val.is_type_compatible(t),
 
            Value::ByteArray(val) => val.is_type_compatible(t),
 
            Value::ShortArray(val) => val.is_type_compatible(t),
 
            Value::IntArray(val) => val.is_type_compatible(t),
 
            Value::LongArray(val) => val.is_type_compatible(t),
 
        }
 
    }
 
}
 

	
 
impl Display for Value {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
 
        let disp: &dyn Display;
 
        match self {
 
            Value::Input(val) => disp = val,
 
            Value::Output(val) => disp = val,
 
            Value::Message(val) => disp = val,
 
            Value::Boolean(val) => disp = val,
 
            Value::Byte(val) => disp = val,
 
            Value::Short(val) => disp = val,
 
            Value::Int(val) => disp = val,
 
            Value::Long(val) => disp = val,
 
            Value::InputArray(val) => disp = val,
 
            Value::OutputArray(val) => disp = val,
 
            Value::MessageArray(val) => disp = val,
 
            Value::BooleanArray(val) => disp = val,
 
            Value::ByteArray(val) => disp = val,
 
            Value::ShortArray(val) => disp = val,
 
            Value::IntArray(val) => disp = val,
 
            Value::LongArray(val) => disp = val,
 
        }
 
        disp.fmt(f)
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct InputValue(pub Key);
 

	
 
impl Display for InputValue {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
 
        write!(f, "#in")
 
    }
 
}
 

	
 
impl ValueImpl for InputValue {
 
    fn exact_type(&self) -> Type {
 
        Type::INPUT
 
    }
 
    fn is_type_compatible(&self, t: &Type) -> bool {
 
        let Type { primitive, array } = t;
 
        if *array {
 
            return false;
 
        }
 
        match primitive {
 
            PrimitiveType::Input => true,
 
            _ => false,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct OutputValue(pub Key);
 

	
 
impl Display for OutputValue {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
 
        write!(f, "#out")
 
    }
 
}
 

	
 
impl ValueImpl for OutputValue {
 
    fn exact_type(&self) -> Type {
 
        Type::OUTPUT
 
    }
 
    fn is_type_compatible(&self, t: &Type) -> bool {
 
        let Type { primitive, array } = t;
 
        if *array {
 
            return false;
 
        }
 
        match primitive {
 
            PrimitiveType::Output => true,
 
            _ => false,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct MessageValue(pub Option<Vec<u8>>);
 
pub struct MessageValue(pub Option<Payload>);
 

	
 
impl Display for MessageValue {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
 
        match &self.0 {
 
            None => write!(f, "null"),
 
            Some(vec) => {
 
                write!(f, "#msg({};", vec.len())?;
 
                let mut i = 0;
 
                for v in vec.iter() {
 
                    if i > 0 {
 
                        write!(f, ",")?;
 
                    }
 
                    write!(f, "{}", v)?;
 
                    i += 1;
 
                    if i >= 10 {
 
                        write!(f, ",...")?;
 
                        break;
 
                    }
 
            Some(payload) => {
 
                // format print up to 10 bytes
 
                let mut slice = payload.as_slice();
 
                if slice.len() > 10 {
 
                    slice = &slice[..10];
 
                }
 
                write!(f, ")")
 
                f.debug_list().entries(slice.iter().copied()).finish()
 
            }
 
        }
 
    }
 
}
 

	
 
impl ValueImpl for MessageValue {
 
    fn exact_type(&self) -> Type {
 
        Type::MESSAGE
 
    }
 
    fn is_type_compatible(&self, t: &Type) -> bool {
 
        let Type { primitive, array } = t;
 
        if *array {
 
            return false;
 
        }
 
        match primitive {
 
            PrimitiveType::Message => true,
 
            _ => false,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct BooleanValue(bool);
 

	
 
impl Display for BooleanValue {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
 
        write!(f, "{}", self.0)
 
    }
 
}
 

	
 
impl ValueImpl for BooleanValue {
 
    fn exact_type(&self) -> Type {
 
        Type::BOOLEAN
 
    }
 
    fn is_type_compatible(&self, t: &Type) -> bool {
 
        let Type { primitive, array } = t;
 
        if *array {
 
            return false;
 
        }
 
        match primitive {
 
            PrimitiveType::Boolean => true,
 
            PrimitiveType::Byte => true,
 
            PrimitiveType::Short => true,
 
            PrimitiveType::Int => true,
 
            PrimitiveType::Long => true,
 
            _ => false,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct ByteValue(i8);
 

	
 
impl Display for ByteValue {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
 
        write!(f, "{}", self.0)
 
    }
 
}
 

	
 
impl ValueImpl for ByteValue {
 
    fn exact_type(&self) -> Type {
 
        Type::BYTE
 
    }
 
    fn is_type_compatible(&self, t: &Type) -> bool {
 
        let Type { primitive, array } = t;
 
        if *array {
 
            return false;
 
        }
 
        match primitive {
 
            PrimitiveType::Byte => true,
 
            PrimitiveType::Short => true,
 
            PrimitiveType::Int => true,
 
            PrimitiveType::Long => true,
 
            _ => false,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct ShortValue(i16);
 

	
 
impl Display for ShortValue {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
 
        write!(f, "{}", self.0)
 
    }
 
}
 

	
 
impl ValueImpl for ShortValue {
 
    fn exact_type(&self) -> Type {
 
        Type::SHORT
 
    }
 
    fn is_type_compatible(&self, t: &Type) -> bool {
 
        let Type { primitive, array } = t;
 
        if *array {
 
            return false;
 
        }
 
        match primitive {
 
            PrimitiveType::Short => true,
 
            PrimitiveType::Int => true,
 
            PrimitiveType::Long => true,
 
            _ => false,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct IntValue(i32);
 

	
 
impl Display for IntValue {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
 
        write!(f, "{}", self.0)
 
    }
 
}
 

	
 
impl ValueImpl for IntValue {
 
    fn exact_type(&self) -> Type {
 
        Type::INT
 
    }
 
    fn is_type_compatible(&self, t: &Type) -> bool {
 
        let Type { primitive, array } = t;
 
        if *array {
 
            return false;
 
        }
 
        match primitive {
 
            PrimitiveType::Int => true,
 
            PrimitiveType::Long => true,
 
            _ => false,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct LongValue(i64);
 

	
 
impl Display for LongValue {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
 
        write!(f, "{}", self.0)
 
    }
 
}
 

	
 
impl ValueImpl for LongValue {
 
    fn exact_type(&self) -> Type {
 
        Type::LONG
 
    }
 
    fn is_type_compatible(&self, t: &Type) -> bool {
 
        let Type { primitive, array } = t;
 
        if *array {
 
            return false;
 
        }
 
        match primitive {
 
            PrimitiveType::Long => true,
 
            _ => false,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct InputArrayValue(Vec<InputValue>);
 

	
 
impl Display for InputArrayValue {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
 
        write!(f, "{{")?;
 
        let mut first = true;
 
        for v in self.0.iter() {
 
            if !first {
 
                write!(f, ",")?;
 
            }
 
            write!(f, "{}", v)?;
 
            first = false;
 
        }
 
        write!(f, "}}")
 
    }
 
}
 

	
 
impl ValueImpl for InputArrayValue {
 
    fn exact_type(&self) -> Type {
 
        Type::INPUT_ARRAY
 
    }
 
    fn is_type_compatible(&self, t: &Type) -> bool {
 
        let Type { primitive, array } = t;
 
        if !*array {
 
            return false;
 
        }
 
        match primitive {
 
            PrimitiveType::Input => true,
 
            _ => false,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct OutputArrayValue(Vec<OutputValue>);
src/protocol/mod.rs
Show inline comments
 
@@ -39,239 +39,243 @@ impl ProtocolDescription for ProtocolDescriptionImpl {
 
                err.write(&source, &mut vec).unwrap();
 
                Err(String::from_utf8_lossy(&vec).to_string())
 
            }
 
        }
 
    }
 
    fn component_polarities(&self, identifier: &[u8]) -> Result<Vec<Polarity>, MainComponentErr> {
 
        let h = &self.heap;
 
        let root = &h[self.root];
 
        let def = root.get_definition_ident(h, identifier);
 
        if def.is_none() {
 
            return Err(MainComponentErr::NoSuchComponent);
 
        }
 
        let def = &h[def.unwrap()];
 
        if !def.is_component() {
 
            return Err(MainComponentErr::NoSuchComponent);
 
        }
 
        for &param in def.parameters().iter() {
 
            let param = &h[param];
 
            let type_annot = &h[param.type_annotation];
 
            if type_annot.the_type.array {
 
                return Err(MainComponentErr::NonPortTypeParameters);
 
            }
 
            match type_annot.the_type.primitive {
 
                PrimitiveType::Input | PrimitiveType::Output => continue,
 
                _ => {
 
                    return Err(MainComponentErr::NonPortTypeParameters);
 
                }
 
            }
 
        }
 
        let mut result = Vec::new();
 
        for &param in def.parameters().iter() {
 
            let param = &h[param];
 
            let type_annot = &h[param.type_annotation];
 
            let ptype = &type_annot.the_type.primitive;
 
            if ptype == &PrimitiveType::Input {
 
                result.push(Polarity::Getter)
 
            } else if ptype == &PrimitiveType::Output {
 
                result.push(Polarity::Putter)
 
            } else {
 
                unreachable!()
 
            }
 
        }
 
        Ok(result)
 
    }
 
    fn new_main_component(&self, identifier: &[u8], ports: &[Key]) -> ComponentStateImpl {
 
        let mut args = Vec::new();
 
        for (&x, y) in ports.iter().zip(self.component_polarities(identifier).unwrap()) {
 
            match y {
 
                Polarity::Getter => args.push(Value::Input(InputValue(x))),
 
                Polarity::Putter => args.push(Value::Output(OutputValue(x))),
 
            }
 
        }
 
        let h = &self.heap;
 
        let root = &h[self.root];
 
        let def = root.get_definition_ident(h, identifier).unwrap();
 
        ComponentStateImpl { prompt: Prompt::new(h, def, &args) }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct ComponentStateImpl {
 
    prompt: Prompt,
 
}
 
impl ComponentState for ComponentStateImpl {
 
    type D = ProtocolDescriptionImpl;
 

	
 
    fn pre_sync_run<C: MonoContext<D = ProtocolDescriptionImpl, S = Self>>(
 
        &mut self,
 
        context: &mut C,
 
        pd: &ProtocolDescriptionImpl,
 
    ) -> MonoBlocker {
 
        let mut context = EvalContext::Mono(context);
 
        loop {
 
            let result = self.prompt.step(&pd.heap, &mut context);
 
            match result {
 
                // In component definitions, there are no return statements
 
                Ok(_) => unreachable!(),
 
                Err(cont) => match cont {
 
                    EvalContinuation::Stepping => continue,
 
                    EvalContinuation::Inconsistent => return MonoBlocker::Inconsistent,
 
                    EvalContinuation::Terminal => return MonoBlocker::ComponentExit,
 
                    EvalContinuation::SyncBlockStart => return MonoBlocker::SyncBlockStart,
 
                    // Not possible to end sync block if never entered one
 
                    EvalContinuation::SyncBlockEnd => unreachable!(),
 
                    EvalContinuation::NewComponent(decl, args) => {
 
                        // Look up definition (TODO for now, assume it is a definition)
 
                        let h = &pd.heap;
 
                        let def = h[decl].as_defined().definition;
 
                        let init_state = ComponentStateImpl { prompt: Prompt::new(h, def, &args) };
 
                        context.new_component(&args, init_state);
 
                        // Continue stepping
 
                        continue;
 
                    }
 
                    // Outside synchronous blocks, no fires/get/put happens
 
                    EvalContinuation::BlockFires(_) => unreachable!(),
 
                    EvalContinuation::BlockGet(_) => unreachable!(),
 
                    EvalContinuation::Put(_, _) => unreachable!(),
 
                },
 
            }
 
        }
 
    }
 

	
 
    fn sync_run<C: PolyContext<D = ProtocolDescriptionImpl>>(
 
        &mut self,
 
        context: &mut C,
 
        pd: &ProtocolDescriptionImpl,
 
    ) -> PolyBlocker {
 
        let mut context = EvalContext::Poly(context);
 
        loop {
 
            let result = self.prompt.step(&pd.heap, &mut context);
 
            match result {
 
                // Inside synchronous blocks, there are no return statements
 
                Ok(_) => unreachable!(),
 
                Err(cont) => match cont {
 
                    EvalContinuation::Stepping => continue,
 
                    EvalContinuation::Inconsistent => return PolyBlocker::Inconsistent,
 
                    // First need to exit synchronous block before definition may end
 
                    EvalContinuation::Terminal => unreachable!(),
 
                    // No nested synchronous blocks
 
                    EvalContinuation::SyncBlockStart => unreachable!(),
 
                    EvalContinuation::SyncBlockEnd => return PolyBlocker::SyncBlockEnd,
 
                    // Not possible to create component in sync block
 
                    EvalContinuation::NewComponent(_, _) => unreachable!(),
 
                    EvalContinuation::BlockFires(port) => match port {
 
                        Value::Output(OutputValue(key)) => {
 
                            return PolyBlocker::CouldntCheckFiring(key);
 
                        }
 
                        Value::Input(InputValue(key)) => {
 
                            return PolyBlocker::CouldntCheckFiring(key);
 
                        }
 
                        _ => unreachable!(),
 
                    },
 
                    EvalContinuation::BlockGet(port) => match port {
 
                        Value::Output(OutputValue(key)) => {
 
                            return PolyBlocker::CouldntReadMsg(key);
 
                        }
 
                        Value::Input(InputValue(key)) => {
 
                            return PolyBlocker::CouldntReadMsg(key);
 
                        }
 
                        _ => unreachable!(),
 
                    },
 
                    EvalContinuation::Put(port, message) => {
 
                        let key;
 
                        match port {
 
                            Value::Output(OutputValue(the_key)) => {
 
                                key = the_key;
 
                            }
 
                            Value::Input(InputValue(the_key)) => {
 
                                key = the_key;
 
                            }
 
                            _ => unreachable!(),
 
                        }
 
                        let payload;
 
                        match message {
 
                            Value::Message(MessageValue(None)) => {
 
                                // Putting a null message is inconsistent
 
                                return PolyBlocker::Inconsistent;
 
                            }
 
                            Value::Message(MessageValue(Some(buffer))) => {
 
                                // Create a copy of the payload
 
                                payload = buffer.clone();
 
                            }
 
                            _ => unreachable!(),
 
                        }
 
                        return PolyBlocker::PutMsg(key, payload);
 
                    }
 
                },
 
            }
 
        }
 
    }
 
}
 

	
 
pub enum EvalContext<'a> {
 
    Mono(&'a mut dyn MonoContext<D = ProtocolDescriptionImpl, S = ComponentStateImpl>),
 
    Poly(&'a mut dyn PolyContext<D = ProtocolDescriptionImpl>),
 
    None,
 
}
 
impl EvalContext<'_> {
 
    fn random(&mut self) -> LongValue {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(context) => todo!(),
 
            EvalContext::Poly(_) => unreachable!(),
 
        }
 
    }
 
    fn new_component(&mut self, args: &[Value], init_state: ComponentStateImpl) -> () {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(context) => {
 
                let mut moved_keys = HashSet::new();
 
                for arg in args.iter() {
 
                    match arg {
 
                        Value::Output(OutputValue(key)) => { moved_keys.insert(*key); }
 
                        Value::Input(InputValue(key)) => { moved_keys.insert(*key); }
 
                        Value::Output(OutputValue(key)) => {
 
                            moved_keys.insert(*key);
 
                        }
 
                        Value::Input(InputValue(key)) => {
 
                            moved_keys.insert(*key);
 
                        }
 
                        _ => {}
 
                    }
 
                }
 
                context.new_component(moved_keys, init_state)
 
            }
 
            EvalContext::Poly(_) => unreachable!(),
 
        }
 
    }
 
    fn new_channel(&mut self) -> [Value; 2] {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(context) => {
 
                let [from, to] = context.new_channel();
 
                let from = Value::Output(OutputValue(from));
 
                let to = Value::Input(InputValue(to));
 
                return [from, to];
 
            }
 
            EvalContext::Poly(_) => unreachable!()
 
            EvalContext::Poly(_) => unreachable!(),
 
        }
 
    }
 
    fn fires(&mut self, port: Value) -> Option<Value> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(_) => unreachable!(),
 
            EvalContext::Poly(context) => match port {
 
                Value::Output(OutputValue(key)) => context.is_firing(key).map(Value::from),
 
                Value::Input(InputValue(key)) => context.is_firing(key).map(Value::from),
 
                _ => unreachable!(),
 
            },
 
        }
 
    }
 
    fn get(&mut self, port: Value) -> Option<Value> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(_) => unreachable!(),
 
            EvalContext::Poly(context) => match port {
 
                Value::Output(OutputValue(key)) => {
 
                    context.read_msg(key).map(Value::receive_message)
 
                }
 
                Value::Input(InputValue(key)) => context.read_msg(key).map(Value::receive_message),
 
                _ => unreachable!(),
 
            },
 
        }
 
    }
 
}
src/runtime/connector.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::{errors::*, *};
 

	
 
pub fn random_controller_id() -> ControllerId {
 
    type Bytes8 = [u8; std::mem::size_of::<ControllerId>()];
 
    let mut bytes = Bytes8::default();
 
    getrandom::getrandom(&mut bytes).unwrap();
 
    unsafe { std::mem::transmute::<Bytes8, ControllerId>(bytes) }
 
}
 

	
 
impl Default for Unconfigured {
 
    fn default() -> Self {
 
        let controller_id = random_controller_id();
 
        Self { controller_id }
 
    }
 
}
 
impl Default for Connector {
 
    fn default() -> Self {
 
        Self::Unconfigured(Unconfigured::default())
 
    }
 
}
 
impl Connector {
 
    /// Configure the Connector with the given Pdl description.
 
    pub fn configure(&mut self, pdl: &[u8], main_component: &[u8]) -> Result<(), ConfigErr> {
 
        use ConfigErr::*;
 
        let controller_id = match self {
 
            Connector::Configured(_) => return Err(AlreadyConfigured),
 
            Connector::Connected(_) => return Err(AlreadyConnected),
 
            Connector::Unconfigured(Unconfigured { controller_id }) => *controller_id,
 
        };
 
        let protocol_description = Arc::new(ProtocolD::parse(pdl).map_err(ParseErr)?);
 
        let polarities = protocol_description.component_polarities(main_component)?;
 
        let configured = Configured {
 
            controller_id,
 
            protocol_description,
 
            bindings: Default::default(),
 
            polarities,
 
            main_component: main_component.to_vec(),
 
            logger: "Logger created!\n".into(),
 
        };
 
        *self = Connector::Configured(configured);
 
        Ok(())
 
    }
 

	
 
    /// Bind the (configured) connector's port corresponding to the
 
    pub fn bind_port(
 
        &mut self,
 
        proto_port_index: usize,
 
        binding: PortBinding,
 
    ) -> Result<(), PortBindErr> {
 
        use PortBindErr::*;
 
        match self {
 
            Connector::Unconfigured { .. } => Err(NotConfigured),
 
            Connector::Connected(_) => Err(AlreadyConnected),
 
            Connector::Configured(configured) => {
 
                if configured.polarities.len() <= proto_port_index {
 
                    return Err(IndexOutOfBounds);
 
                }
 
                configured.bindings.insert(proto_port_index, binding);
 
                Ok(())
 
            }
 
        }
 
    }
 
    pub fn connect(&mut self, timeout: Duration) -> Result<(), ConnectErr> {
 
        let deadline = Instant::now() + timeout;
 
        use ConnectErr::*;
 
        let configured = match self {
 
            Connector::Unconfigured { .. } => return Err(NotConfigured),
 
            Connector::Connected(_) => return Err(AlreadyConnected),
 
            Connector::Configured(configured) => configured,
 
        };
 
        // 1. Unwrap bindings or err
 
        let bound_proto_interface: Vec<(_, _)> = configured
 
            .polarities
 
            .iter()
 
            .copied()
 
            .enumerate()
 
            .map(|(native_index, polarity)| {
 
                let binding = configured
 
                    .bindings
 
                    .get(&native_index)
 
                    .copied()
 
                    .ok_or(PortNotBound { native_index })?;
 
                Ok((binding, polarity))
 
            })
 
            .collect::<Result<Vec<(_, _)>, ConnectErr>>()?;
 
        let (controller, native_interface) = Controller::connect(
 
            configured.controller_id,
 
            &configured.main_component,
 
            configured.protocol_description.clone(),
 
            &bound_proto_interface[..],
 
            &mut configured.logger,
 
            deadline,
 
        )?;
 
        *self = Connector::Connected(Connected {
 
            native_interface,
 
            sync_batches: vec![Default::default()],
 
            controller,
 
        });
 
        Ok(())
 
    }
 
    pub fn get_mut_logger(&mut self) -> Option<&mut String> {
 
        match self {
 
            Connector::Configured(configured) => Some(&mut configured.logger),
 
            Connector::Connected(connected) => Some(&mut connected.controller.inner.logger),
 
            _ => None,
 
        }
 
    }
 

	
 
    pub fn put(&mut self, native_port_index: usize, payload: Payload) -> Result<(), PortOpErr> {
 
        use PortOpErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 
        let (ekey, native_polarity) =
 
            *connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?;
 
        if native_polarity != Putter {
 
            return Err(WrongPolarity);
 
        }
 
        let sync_batch = connected.sync_batches.iter_mut().last().expect("no sync batch!");
 
        if sync_batch.puts.contains_key(&ekey) {
 
            return Err(DuplicateOperation);
 
        }
 
        sync_batch.puts.insert(ekey, payload);
 
        Ok(())
 
    }
 

	
 
    pub fn get(&mut self, native_port_index: usize) -> Result<(), PortOpErr> {
 
        use PortOpErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 
        let (ekey, native_polarity) =
 
            *connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?;
 
        if native_polarity != Getter {
 
            return Err(WrongPolarity);
 
        }
 
        let sync_batch = connected.sync_batches.iter_mut().last().expect("no sync batch!");
 
        if sync_batch.gets.contains(&ekey) {
 
            return Err(DuplicateOperation);
 
        }
 
        sync_batch.gets.insert(ekey);
 
        Ok(())
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, ()> {
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(()),
 
        };
 
        connected.sync_batches.push(SyncBatch::default());
 
        Ok(connected.sync_batches.len() - 2)
 
    }
 

	
 
    pub fn sync(&mut self, timeout: Duration) -> Result<usize, SyncErr> {
 
        let deadline = Instant::now() + timeout;
 
        use SyncErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 

	
 
        // do the synchronous round!
 
        let res =
 
            connected.controller.sync_round(Some(deadline), Some(connected.sync_batches.drain(..)));
 
        connected.sync_batches.push(SyncBatch::default());
 
        res?;
 
        Ok(connected.controller.inner.mono_n.result.as_mut().expect("qqqs").0)
 
    }
 

	
 
    pub fn read_gotten(&self, native_port_index: usize) -> Result<&[u8], ReadGottenErr> {
 
        use ReadGottenErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 
        let &(key, polarity) =
 
            connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?;
 
        if polarity != Getter {
 
            return Err(WrongPolarity);
 
        }
 
        let result = connected.controller.inner.mono_n.result.as_ref().ok_or(NoPreviousRound)?;
 
        let payload = result.1.get(&key).ok_or(DidNotGet)?;
 
        Ok(payload)
 
        Ok(payload.as_slice())
 
    }
 
}
src/runtime/ffi.rs
Show inline comments
 
@@ -69,306 +69,306 @@ pub unsafe extern "C" fn connector_error_peek() -> *const c_char {
 
            stored_error.buf.as_ptr()
 
        } else {
 
            std::ptr::null()
 
        }
 
    })
 
}
 

	
 
/// Resets the error message buffer.
 
/// Returns:
 
/// - 0 if an error was cleared
 
/// - 1 if there was no error to clear
 
/// # Safety
 
/// TODO
 
#[no_mangle]
 
pub extern "C" fn connector_error_clear() -> c_int {
 
    LAST_ERROR.with(|stored_error| {
 
        let mut stored_error = stored_error.borrow_mut();
 
        if stored_error.filled {
 
            stored_error.buf.clear();
 
            stored_error.filled = false;
 
            0
 
        } else {
 
            1
 
        }
 
    })
 
}
 

	
 
/// Creates and returns Reowolf Connector structure allocated on the heap.
 
#[no_mangle]
 
pub extern "C" fn connector_new() -> *mut Connector {
 
    Box::into_raw(Box::new(Connector::default()))
 
}
 

	
 
/// Creates and returns Reowolf Connector structure allocated on the heap.
 
#[no_mangle]
 
pub extern "C" fn connector_with_controller_id(controller_id: ControllerId) -> *mut Connector {
 
    Box::into_raw(Box::new(Connector::Unconfigured(Unconfigured { controller_id })))
 
}
 

	
 
/// Configures the given Reowolf connector with a protocol description in PDL.
 
/// Returns:
 
/// # Safety
 
/// TODO
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_configure(
 
    connector: *mut Connector,
 
    pdl: *mut c_char,
 
    main: *mut c_char,
 
) -> c_int {
 
    let mut b = Box::from_raw(connector); // unsafe!
 
    let ret = as_rust_bytes(pdl, |pdl_bytes| {
 
        as_rust_bytes(main, |main_bytes| match b.configure(pdl_bytes, main_bytes) {
 
            Ok(()) => 0,
 
            Err(e) => {
 
                overwrite_last_error(format!("{:?}", e).as_bytes());
 
                -1
 
            }
 
        })
 
    });
 
    Box::into_raw(b); // don't drop!
 
    ret
 
}
 

	
 
/// Provides a binding annotation for the port with the given index with "native":
 
/// (The port is exposed for reading and writing from the application)
 
/// Returns:
 
/// # Safety
 
/// TODO
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_bind_native(
 
    connector: *mut Connector,
 
    proto_port_index: usize,
 
) -> c_int {
 
    // use PortBindErr::*;
 
    let mut b = Box::from_raw(connector); // unsafe!
 
    let ret = match b.bind_port(proto_port_index, PortBinding::Native) {
 
        Ok(()) => 0,
 
        Err(e) => {
 
            overwrite_last_error(format!("{:?}", e).as_bytes());
 
            -1
 
        }
 
    };
 
    Box::into_raw(b); // don't drop!
 
    ret
 
}
 

	
 
/// Provides a binding annotation for the port with the given index with "native":
 
/// (The port is exposed for reading and writing from the application)
 
/// Returns:
 
/// # Safety
 
/// TODO
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_bind_passive(
 
    connector: *mut Connector,
 
    proto_port_index: c_uint,
 
    address: *const c_char,
 
) -> c_int {
 
    if let Some(addr) = try_parse_addr(address) {
 
        // use PortBindErr::*;
 
        let mut b = Box::from_raw(connector); // unsafe!
 
        let ret =
 
            match b.bind_port(proto_port_index.try_into().unwrap(), PortBinding::Passive(addr)) {
 
                Ok(()) => 0,
 
                Err(e) => {
 
                    overwrite_last_error(format!("{:?}", e).as_bytes());
 
                    -1
 
                }
 
            };
 
        Box::into_raw(b); // don't drop!
 
        ret
 
    } else {
 
        overwrite_last_error(b"Failed to parse input as ip address!");
 
        -1
 
    }
 
}
 

	
 
/// Provides a binding annotation for the port with the given index with "active":
 
/// (The port will conenct to a "passive" port at the given address during connect())
 
/// Returns:
 
/// - 0 for success
 
/// - 1 if the port was already bound and was left unchanged
 
/// # Safety
 
/// TODO
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_bind_active(
 
    connector: *mut Connector,
 
    proto_port_index: c_uint,
 
    address: *const c_char,
 
) -> c_int {
 
    if let Some(addr) = try_parse_addr(address) {
 
        // use PortBindErr::*;
 
        let mut b = Box::from_raw(connector); // unsafe!
 
        let ret = match b.bind_port(proto_port_index.try_into().unwrap(), PortBinding::Active(addr))
 
        {
 
            Ok(()) => 0,
 
            Err(e) => {
 
                overwrite_last_error(format!("{:?}", e).as_bytes());
 
                -1
 
            }
 
        };
 
        Box::into_raw(b); // don't drop!
 
        ret
 
    } else {
 
        overwrite_last_error(b"Failed to parse input as ip address!");
 
        -1
 
    }
 
}
 

	
 
/// Provides a binding annotation for the port with the given index with "active":
 
/// (The port will conenct to a "passive" port at the given address during connect())
 
/// Returns:
 
/// - 0 SUCCESS: connected successfully
 
/// - TODO error codes
 
/// # Safety
 
/// TODO
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_connect(
 
    connector: *mut Connector,
 
    timeout_millis: u64,
 
) -> c_int {
 
    let mut b = Box::from_raw(connector); // unsafe!
 
    let ret = match b.connect(Duration::from_millis(timeout_millis)) {
 
        Ok(()) => 0,
 
        Err(e) => {
 
            overwrite_last_error(format!("{:?}", e).as_bytes());
 
            -1
 
        }
 
    };
 
    Box::into_raw(b); // don't drop!
 
    ret
 
}
 

	
 
/// Destroys the given connector, freeing its underlying resources.
 
/// # Safety
 
/// TODO
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_destroy(connector: *mut Connector) {
 
    let c = Box::from_raw(connector); // unsafe!
 
    drop(c); // for readability
 
}
 

	
 
/// Prepares to synchronously put a message at the given port, reading it from the given buffer.
 
/// # Safety
 
/// TODO
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_put(
 
    connector: *mut Connector,
 
    proto_port_index: c_uint,
 
    buf_ptr: *mut c_uchar,
 
    msg_len: c_uint,
 
) -> c_int {
 
    let buf = std::slice::from_raw_parts_mut(buf_ptr, msg_len.try_into().unwrap());
 
    let payload = buf.to_vec(); // unsafe
 
    let vec: Vec<u8> = buf.to_vec(); // unsafe
 
    let mut b = Box::from_raw(connector); // unsafe!
 
    let ret = b.put(proto_port_index.try_into().unwrap(), payload);
 
    let ret = b.put(proto_port_index.try_into().unwrap(), vec.into());
 
    Box::into_raw(b); // don't drop!
 
    match ret {
 
        Ok(()) => 0,
 
        Err(e) => {
 
            overwrite_last_error(format!("{:?}", e).as_bytes());
 
            -1
 
        }
 
    }
 
}
 

	
 
/// Prepares to synchronously put a message at the given port, writing it to the given buffer.
 
/// - 0 SUCCESS
 
/// - 1 this port has the wrong direction
 
/// - 2 this port is already marked to get
 
/// # Safety
 
/// TODO
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_get(
 
    connector: *mut Connector,
 
    proto_port_index: c_uint,
 
) -> c_int {
 
    let mut b = Box::from_raw(connector); // unsafe!
 
    let ret = b.get(proto_port_index.try_into().unwrap());
 
    Box::into_raw(b); // don't drop!
 
                      // use PortOperationErr::*;
 
    match ret {
 
        Ok(()) => 0,
 
        Err(e) => {
 
            overwrite_last_error(format!("{:?}", e).as_bytes());
 
            -1
 
        }
 
    }
 
}
 

	
 
/// # Safety
 
/// TODO
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_gotten(
 
    connector: *mut Connector,
 
    proto_port_index: c_uint,
 
    buf_ptr_outptr: *mut *const c_uchar,
 
    len_outptr: *mut c_uint,
 
) -> c_int {
 
    let b = Box::from_raw(connector); // unsafe!
 
    let ret = b.read_gotten(proto_port_index.try_into().unwrap());
 
    // use ReadGottenErr::*;
 
    let result = match ret {
 
        Ok(ptr_slice) => {
 
            let buf_ptr = ptr_slice.as_ptr();
 
            let len = ptr_slice.len().try_into().unwrap();
 
            buf_ptr_outptr.write(buf_ptr);
 
            len_outptr.write(len);
 
            0
 
        }
 
        Err(e) => {
 
            overwrite_last_error(format!("{:?}", e).as_bytes());
 
            -1
 
        }
 
    };
 
    Box::into_raw(b); // don't drop!
 
    result
 
}
 

	
 
/// # Safety
 
/// TODO
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_dump_log(connector: *mut Connector) -> c_int {
 
    let mut b = Box::from_raw(connector); // unsafe!
 
    let result = match b.get_mut_logger() {
 
        Some(s) => {
 
            println!("{}", s);
 
            0
 
        }
 
        None => 1,
 
    };
 
    Box::into_raw(b); // don't drop!
 
    result
 
}
 

	
 
/// # Safety
 
/// TODO
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_next_batch(connector: *mut Connector) -> c_int {
 
    let mut b = Box::from_raw(connector); // unsafe!
 
    let result = match b.next_batch() {
 
        Ok(batch_index) => batch_index.try_into().unwrap(),
 
        Err(e) => {
 
            overwrite_last_error(format!("{:?}", e).as_bytes());
 
            -1
 
        }
 
    };
 
    Box::into_raw(b); // don't drop!
 
    result
 
}
 

	
 
/// # Safety
 
/// TODO
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_sync(connector: *mut Connector, timeout_millis: u64) -> c_int {
 
    let mut b = Box::from_raw(connector); // unsafe!
 
    let result = match b.sync(Duration::from_millis(timeout_millis)) {
 
        Ok(batch_index) => batch_index.try_into().unwrap(),
 
        Err(SyncErr::Timeout) => -1, // timeout!
 
        Err(e) => {
 
            overwrite_last_error(format!("{:?}", e).as_bytes());
 
            -2
 
        }
 
    };
 
    Box::into_raw(b); // don't drop!
 
    result
 
}
src/runtime/serde.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::{
 
    endpoint::{CommMsg, CommMsgContents, Decision, EndpointInfo, Msg, SetupMsg},
 
    Predicate,
 
};
 
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
 
use std::io::{ErrorKind::InvalidData, Read, Write};
 

	
 
pub trait Ser<T>: Write {
 
    fn ser(&mut self, t: &T) -> Result<(), std::io::Error>;
 
}
 
pub trait De<T>: Read {
 
    fn de(&mut self) -> Result<T, std::io::Error>;
 
}
 

	
 
pub struct MonitoredReader<R: Read> {
 
    bytes: usize,
 
    r: R,
 
}
 
impl<R: Read> From<R> for MonitoredReader<R> {
 
    fn from(r: R) -> Self {
 
        Self { r, bytes: 0 }
 
    }
 
}
 
impl<R: Read> MonitoredReader<R> {
 
    pub fn bytes_read(&self) -> usize {
 
        self.bytes
 
    }
 
}
 
impl<R: Read> Read for MonitoredReader<R> {
 
    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
 
        let n = self.r.read(buf)?;
 
        self.bytes += n;
 
        Ok(n)
 
    }
 
}
 

	
 
/////////////////////////////////////////
 

	
 
struct VarLenInt(u64);
 

	
 
macro_rules! ser_seq {
 
    ( $w:expr ) => {{
 
        io::Result::Ok(())
 
    }};
 
    ( $w:expr, $first:expr ) => {{
 
        $w.ser($first)
 
    }};
 
    ( $w:expr, $first:expr, $( $x:expr ),+ ) => {{
 
        $w.ser($first)?;
 
        ser_seq![$w, $( $x ),*]
 
    }};
 
}
 
/////////////////////////////////////////
 

	
 
impl<W: Write> Ser<bool> for W {
 
    fn ser(&mut self, t: &bool) -> Result<(), std::io::Error> {
 
        self.ser(&match t {
 
            true => b'T',
 
            false => b'F',
 
        })
 
    }
 
}
 
impl<R: Read> De<bool> for R {
 
    fn de(&mut self) -> Result<bool, std::io::Error> {
 
        let b: u8 = self.de()?;
 
        Ok(match b {
 
            b'T' => true,
 
            b'F' => false,
 
            _ => return Err(InvalidData.into()),
 
        })
 
    }
 
}
 

	
 
impl<W: Write> Ser<u8> for W {
 
    fn ser(&mut self, t: &u8) -> Result<(), std::io::Error> {
 
        self.write_u8(*t)
 
    }
 
}
 
impl<R: Read> De<u8> for R {
 
    fn de(&mut self) -> Result<u8, std::io::Error> {
 
        self.read_u8()
 
    }
 
}
 

	
 
impl<W: Write> Ser<u16> for W {
 
    fn ser(&mut self, t: &u16) -> Result<(), std::io::Error> {
 
        self.write_u16::<BigEndian>(*t)
 
    }
 
}
 
impl<R: Read> De<u16> for R {
 
    fn de(&mut self) -> Result<u16, std::io::Error> {
 
        self.read_u16::<BigEndian>()
 
    }
 
}
 

	
 
impl<W: Write> Ser<u32> for W {
 
    fn ser(&mut self, t: &u32) -> Result<(), std::io::Error> {
 
        self.write_u32::<BigEndian>(*t)
 
    }
 
}
 
impl<R: Read> De<u32> for R {
 
    fn de(&mut self) -> Result<u32, std::io::Error> {
 
        self.read_u32::<BigEndian>()
 
    }
 
}
 

	
 
impl<W: Write> Ser<u64> for W {
 
    fn ser(&mut self, t: &u64) -> Result<(), std::io::Error> {
 
        self.write_u64::<BigEndian>(*t)
 
    }
 
}
 
impl<R: Read> De<u64> for R {
 
    fn de(&mut self) -> Result<u64, std::io::Error> {
 
        self.read_u64::<BigEndian>()
 
    }
 
}
 

	
 
impl<W: Write> Ser<Payload> for W {
 
    fn ser(&mut self, t: &Payload) -> Result<(), std::io::Error> {
 
        self.ser(&VarLenInt(t.len() as u64))?;
 
        for byte in t {
 
        for byte in t.as_slice() {
 
            self.ser(byte)?;
 
        }
 
        Ok(())
 
    }
 
}
 
impl<R: Read> De<Payload> for R {
 
    fn de(&mut self) -> Result<Payload, std::io::Error> {
 
        let VarLenInt(len) = self.de()?;
 
        let mut x = Vec::with_capacity(len as usize);
 
        for _ in 0..len {
 
            x.push(self.de()?);
 
        }
 
        Ok(x)
 
        Ok(x.into())
 
    }
 
}
 

	
 
impl<W: Write> Ser<VarLenInt> for W {
 
    fn ser(&mut self, t: &VarLenInt) -> Result<(), std::io::Error> {
 
        integer_encoding::VarIntWriter::write_varint(self, t.0).map(|_| ())
 
    }
 
}
 
impl<R: Read> De<VarLenInt> for R {
 
    fn de(&mut self) -> Result<VarLenInt, std::io::Error> {
 
        integer_encoding::VarIntReader::read_varint(self).map(VarLenInt)
 
    }
 
}
 

	
 
impl<W: Write> Ser<ChannelId> for W {
 
    fn ser(&mut self, t: &ChannelId) -> Result<(), std::io::Error> {
 
        self.ser(&t.controller_id)?;
 
        self.ser(&VarLenInt(t.channel_index as u64))
 
    }
 
}
 
impl<R: Read> De<ChannelId> for R {
 
    fn de(&mut self) -> Result<ChannelId, std::io::Error> {
 
        Ok(ChannelId {
 
            controller_id: self.de()?,
 
            channel_index: De::<VarLenInt>::de(self)?.0 as ChannelIndex,
 
        })
 
    }
 
}
 

	
 
impl<W: Write> Ser<Predicate> for W {
 
    fn ser(&mut self, t: &Predicate) -> Result<(), std::io::Error> {
 
        self.ser(&VarLenInt(t.assigned.len() as u64))?;
 
        for (channel_id, boolean) in &t.assigned {
 
            ser_seq![self, channel_id, boolean]?;
 
        }
 
        Ok(())
 
    }
 
}
 
impl<R: Read> De<Predicate> for R {
 
    fn de(&mut self) -> Result<Predicate, std::io::Error> {
 
        let VarLenInt(len) = self.de()?;
 
        let mut assigned = BTreeMap::<ChannelId, bool>::default();
 
        for _ in 0..len {
 
            assigned.insert(self.de()?, self.de()?);
 
        }
 
        Ok(Predicate { assigned })
 
    }
 
}
 
impl<W: Write> Ser<Decision> for W {
 
    fn ser(&mut self, t: &Decision) -> Result<(), std::io::Error> {
 
        match t {
 
            Decision::Failure => self.ser(&b'F'),
 
            Decision::Success(predicate) => {
 
                self.ser(&b'S')?;
 
                self.ser(predicate)
 
            }
 
        }
 
    }
 
}
 
impl<R: Read> De<Decision> for R {
 
    fn de(&mut self) -> Result<Decision, std::io::Error> {
 
        let b: u8 = self.de()?;
 
        Ok(match b {
 
            b'F' => Decision::Failure,
 
            b'S' => Decision::Success(self.de()?),
 
            _ => return Err(InvalidData.into()),
 
        })
 
    }
 
}
 

	
 
impl<W: Write> Ser<Polarity> for W {
 
    fn ser(&mut self, t: &Polarity) -> Result<(), std::io::Error> {
 
        self.ser(&match t {
 
            Polarity::Putter => b'P',
 
            Polarity::Getter => b'G',
 
        })
 
    }
 
}
 
impl<R: Read> De<Polarity> for R {
 
    fn de(&mut self) -> Result<Polarity, std::io::Error> {
 
        let b: u8 = self.de()?;
 
        Ok(match b {
 
            b'P' => Polarity::Putter,
 
            b'G' => Polarity::Getter,
 
            _ => return Err(InvalidData.into()),
 
        })
 
    }
 
}
 

	
 
impl<W: Write> Ser<EndpointInfo> for W {
 
    fn ser(&mut self, t: &EndpointInfo) -> Result<(), std::io::Error> {
 
        let EndpointInfo { channel_id, polarity } = t;
 
        ser_seq![self, channel_id, polarity]
 
    }
 
}
 
impl<R: Read> De<EndpointInfo> for R {
 
    fn de(&mut self) -> Result<EndpointInfo, std::io::Error> {
 
        Ok(EndpointInfo { channel_id: self.de()?, polarity: self.de()? })
 
    }
 
}
 

	
 
impl<W: Write> Ser<Msg> for W {
 
    fn ser(&mut self, t: &Msg) -> Result<(), std::io::Error> {
 
        use {CommMsgContents::*, SetupMsg::*};
 
        match t {
 
            Msg::SetupMsg(s) => match s {
 
                // [flag, data]
 
                ChannelSetup { info } => ser_seq![self, &0u8, info],
 
                LeaderEcho { maybe_leader } => ser_seq![self, &1u8, maybe_leader],
 
                LeaderAnnounce { leader } => ser_seq![self, &2u8, leader],
 
                YouAreMyParent => ser_seq![self, &3u8],
 
            },
 
            Msg::CommMsg(CommMsg { round_index, contents }) => {
 
                // [flag, round_num, data]
 
                let varlenint = &VarLenInt(*round_index as u64);
 
                match contents {
 
                    SendPayload { payload_predicate, payload } => {
 
                        ser_seq![self, &4u8, varlenint, payload_predicate, payload]
 
                    }
 
                    Elaborate { partial_oracle } => ser_seq![self, &5u8, varlenint, partial_oracle],
 
                    Announce { decision } => ser_seq![self, &6u8, varlenint, decision],
 
                    Failure => ser_seq![self, &7u8, varlenint],
 
                }
 
            }
 
        }
 
    }
 
}
 
impl<R: Read> De<Msg> for R {
 
    fn de(&mut self) -> Result<Msg, std::io::Error> {
 
        use {CommMsgContents::*, SetupMsg::*};
 
        let b: u8 = self.de()?;
 
        Ok(match b {
 
            0..=3 => Msg::SetupMsg(match b {
 
                // [flag, data]
 
                0u8 => ChannelSetup { info: self.de()? },
 
                1u8 => LeaderEcho { maybe_leader: self.de()? },
 
                2u8 => LeaderAnnounce { leader: self.de()? },
 
                3u8 => YouAreMyParent,
 
                _ => unreachable!(),
 
            }),
 
            4..=7 => {
 
                // [flag, round_num, data]
 
                let VarLenInt(varlenint) = self.de()?;
 
                let contents = match b {
 
                    4u8 => SendPayload { payload_predicate: self.de()?, payload: self.de()? },
 
                    5u8 => Elaborate { partial_oracle: self.de()? },
 
                    6u8 => Announce { decision: self.de()? },
 
                    7u8 => Failure,
 
                    _ => unreachable!(),
 
                };
 
                Msg::CommMsg(CommMsg { round_index: varlenint as usize, contents })
 
            }
 
            _ => return Err(InvalidData.into()),
 
        })
 
    }
 
}
src/test/connector.rs
Show inline comments
 
extern crate test_generator;
 

	
 
use super::*;
 

	
 
use crate::common::*;
 
use crate::runtime::{errors::*, PortBinding::*};
 

	
 
static PDL: &[u8] = b"
 
primitive forward_once(in i, out o) {
 
    synchronous() put(o, get(i));
 
}
 
primitive blocked(in i, out o) {
 
    while(true) synchronous {}
 
}
 
primitive forward(in i, out o) {
 
    while(true) synchronous {
 
        put(o, get(i));
 
    }
 
}
 
primitive sync(in i, out o) {
 
    while(true) synchronous {
 
        if (fires(i)) put(o, get(i));
 
    }
 
}
 
primitive alternator_2(in i, out a, out b) {
 
    while(true) {
 
        synchronous { put(a, get(i)); }
 
        synchronous { put(b, get(i)); } 
 
    }
 
}
 
composite sync_2(in i, out o) {
 
    channel x -> y;
 
    new sync(i, x);
 
    new sync(y, o);
 
}
 
primitive exchange(in ai, out ao, in bi, out bo) {
 
    // Note the implicit causal relationship
 
    while(true) synchronous {
 
        if(fires(ai)) {
 
            put(bo, get(ai));
 
            put(ao, get(bi));
 
        }
 
    }
 
}
 
primitive filter(in i, out ok, out err) {
 
    while(true) synchronous {
 
        if (fires(i)) {
 
            msg m = get(i);
 
            if(m.length > 0) {
 
                put(ok, m);
 
            } else {
 
                put(err, m);
 
            } 
 
        }
 
    }
 
}
 
primitive token_spout(out o) {
 
    while(true) synchronous {
 
        put(o, create(0));
 
    }
 
}
 
primitive wait_n(int to_wait, out o) {
 
    while(to_wait > 0) synchronous() to_wait -= 1;
 
    synchronous { put(o, create(0)); }
 
}
 
composite wait_10(out o) {
 
    new wait_n(10, o);
 
}
 
primitive fifo_1(msg m, in i, out o) {
 
    while(true) synchronous {
 
        if (m == null && fires(i)) {
 
            m = get(i);
 
        } else if (m != null && fires(o)) {
 
            put(o, m);
 
            m = null;
 
        }
 
    }
 
}
 
composite fifo_1_e(in i, out o) {
 
    new fifo_1(null, i, o);
 
}
 
primitive samelen(in a, in b, out c) {
 
    synchronous {
 
        msg m = get(a);
 
        msg n = get(b);
 
        assert(m.length == n.length);
 
        put(c, m);
 
    }
 
}
 
primitive repl2(in a, out b, out c) {
 
    synchronous {
 
        msg m = get(a);
 
        put(b, m);
 
        put(c, m);
 
    }
 
}
 
composite samelen_repl(in a, out b) {
 
    channel c -> d;   
 
    channel e -> f;
 
    new samelen(a, f, c);
 
    new repl2(d, b, e);
 
}
 
";
 

	
 
#[test]
 
fn connector_connects_ok() {
 
    // Test if we can connect natives using the given PDL
 
    /*
 
    Alice -->silence--P|A-->silence--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    assert!(run_connector_set(&[
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"blocked").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"blocked").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_connected_but_silent_natives() {
 
    // Test if we can connect natives and have a trivial sync round
 
    /*
 
    Alice -->silence--P|A-->silence--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    assert!(run_connector_set(&[
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"blocked").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(Ok(0), x.sync(timeout));
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"blocked").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(Ok(0), x.sync(timeout));
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_self_forward_ok() {
 
    // Test a deterministic system
 
    // where a native has no network bindings
 
    // and sends messages to itself
 
    /*
 
        /-->\
 
    Alice   forward
 
        \<--/
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    const N: usize = 5;
 
    static MSG: &[u8] = b"Echo!";
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                x.get(1).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(1));
 
            }
 
        },
 
    ]));
 
}
 
#[test]
 
fn connector_token_spout_ok() {
 
    // Test a deterministic system where the proto
 
    // creates token messages
 
    /*
 
    Alice<--token_spout
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    const N: usize = 5;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"token_spout").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(&[] as &[u8]), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_waiter_ok() {
 
    // Test a stateful proto that blocks port 0 for 10 rounds
 
    // and then sends a single token on the 11th
 
    /*
 
    Alice<--token_spout
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"wait_10").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..10 {
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0));
 
            }
 
            x.get(0).unwrap();
 
            assert_eq!(Ok(0), x.sync(timeout));
 
            assert_eq!(Ok(&[] as &[u8]), x.read_gotten(0));
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_self_forward_timeout() {
 
    // Test a deterministic system
 
    // where a native has no network bindings
 
    // and sends messages to itself
 
    /*
 
        /-->\
 
    Alice   forward
 
        \<--/
 
    */
 
    let timeout = Duration::from_millis(500);
 
    static MSG: &[u8] = b"Echo!";
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Sender
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            x.put(0, MSG.to_vec()).unwrap();
 
            x.put(0, MSG.to_vec().into()).unwrap();
 
            // native and forward components cannot find a solution
 
            assert_eq!(Err(SyncErr::Timeout), x.sync(timeout));
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_forward_det() {
 
    // Test if a deterministic protocol and natives can pass one message
 
    /*
 
    Alice -->forward--P|A-->forward--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    const N: usize = 5;
 
    static MSG: &[u8] = b"Hello!";
 

	
 
    assert!(run_connector_set(&[
 
        &|x| {
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
            }
 
        },
 
        &|x| {
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_nondet_proto_det_natives() {
 
    // Test the use of a nondeterministic protocol
 
    // where Alice decides the choice and the others conform
 
    /*
 
    Alice -->sync--A|P-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    const N: usize = 5;
 
    static MSG: &[u8] = b"Message, here!";
 
    assert!(run_connector_set(&[
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _i in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _i in 0..N {
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_putter_determines() {
 
    // putter and getter
 
    /*
 
    Alice -->sync--A|P-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    const N: usize = 3;
 
    static MSG: &[u8] = b"Hidey ho!";
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _i in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _i in 0..N {
 
                // batches [{0=>*}, {0=>?}]
 
                x.get(0).unwrap();
 
                x.next_batch().unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_getter_determines() {
 
    // putter and getter
 
    /*
 
    Alice -->sync--A|P-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    const N: usize = 5;
 
    static MSG: &[u8] = b"Hidey ho!";
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _i in 0..N {
 
                // batches [{0=>?}, {0=>*}]
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                x.next_batch().unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _i in 0..N {
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_alternator_2() {
 
    // Test a deterministic system which
 
    // alternates sending Sender's messages to A or B
 
    /*                    /--|-->A
 
    Sender -->alternator_2
 
                          \--|-->B
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 5;
 
    static MSG: &[u8] = b"message";
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Sender
 
            x.configure(PDL, b"alternator_2").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.bind_port(2, Passive(addrs[1])).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                for _ in 0..2 {
 
                    x.put(0, MSG.to_vec()).unwrap();
 
                    x.put(0, MSG.to_vec().into()).unwrap();
 
                    assert_eq!(0, x.sync(timeout).unwrap());
 
                }
 
            }
 
        },
 
        &|x| {
 
            // A
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout)); // GET ONE
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 

	
 
                // silent round
 
                assert_eq!(Ok(0), x.sync(timeout)); // MISS ONE
 
                assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0));
 
            }
 
        },
 
        &|x| {
 
            // B
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Active(addrs[1])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                // silent round
 
                assert_eq!(Ok(0), x.sync(timeout)); // MISS ONE
 
                assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0));
 

	
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout)); // GET ONE
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_composite_chain_a() {
 
    // Check if composition works. Forward messages through long chains
 
    /*
 
    Alice -->sync-->sync-->A|P-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    static MSG: &[u8] = b"SSS";
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"sync_2").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_composite_chain_b() {
 
    // Check if composition works. Forward messages through long chains
 
    /*
 
    Alice -->sync-->sync-->A|P-->sync-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    static MSG: &[u8] = b"SSS";
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"sync_2").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.put(0, MSG.to_vec().into()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"sync_2").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_exchange() {
 
    /*
 
        /-->\      /-->P|A-->\      /-->\
 
    Alice   exchange         exchange   Bob
 
        \<--/      \<--P|A<--/      \<--/
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"exchange").unwrap();
 
            x.bind_port(0, Native).unwrap(); // native in
 
            x.bind_port(1, Native).unwrap(); // native out
 
            x.bind_port(2, Passive(addrs[0])).unwrap(); // peer out
 
            x.bind_port(3, Passive(addrs[1])).unwrap(); // peer in
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"A->B".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"A->B".to_vec().into()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"B->A" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"exchange").unwrap();
 
            x.bind_port(0, Native).unwrap(); // native in
 
            x.bind_port(1, Native).unwrap(); // native out
 
            x.bind_port(2, Active(addrs[1])).unwrap(); // peer out
 
            x.bind_port(3, Active(addrs[0])).unwrap(); // peer in
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"B->A".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"B->A".to_vec().into()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"A->B" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_both() {
 
    /* ------->   -----P|A---->   ------->
 
      / /--->\ \ / /---P|A-->\ \ / /--->\ \
 
    Alice    exchange       exchange     Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"exchange").unwrap();
 
            x.bind_port(0, Native).unwrap(); // native in a
 
            x.bind_port(1, Passive(addrs[0])).unwrap(); // peer out a
 
            x.bind_port(2, Native).unwrap(); // native in b
 
            x.bind_port(3, Passive(addrs[1])).unwrap(); // peer out b
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"one".to_vec()));
 
                assert_eq!(Ok(()), x.put(1, b"two".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"one".to_vec().into()));
 
                assert_eq!(Ok(()), x.put(1, b"two".to_vec().into()));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
            }
 
        },
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"exchange").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap(); // peer in a
 
            x.bind_port(1, Native).unwrap(); // native out b
 
            x.bind_port(2, Active(addrs[1])).unwrap(); // peer in b
 
            x.bind_port(3, Native).unwrap(); // native out a
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.get(0));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"one" as &[u8]), x.read_gotten(0));
 
                assert_eq!(Ok(b"two" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_routing_filter() {
 
    // Make a protocol whose behavior is a function of the contents of
 
    // a message. Here, the putter determines what is sent, and the proto
 
    // determines how it is routed
 
    /*
 
    Sender -->filter-->P|A-->sync--> Receiver
 
    */
 
    let timeout = Duration::from_millis(3_000);
 
    let addrs = [next_addr()];
 
    const N: usize = 1;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Sender
 
            x.configure(PDL, b"filter").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.bind_port(2, Native).unwrap(); // err channel
 
            x.connect(timeout).unwrap();
 

	
 
            for i in (0..3).cycle().take(N) {
 
                // messages cycle [], [4], [4,4], ...
 
                let msg: Payload = std::iter::repeat(4).take(i).collect();
 

	
 
                // batch 0: passes through filter!
 
                x.put(0, msg.clone()).unwrap();
 
                x.next_batch().unwrap();
 

	
 
                // batch 1: gets returned!
 
                x.put(0, msg.clone()).unwrap();
 
                x.get(1).unwrap();
 
                match x.sync(timeout).unwrap() {
 
                    0 => assert_ne!(msg.len(), 0), // ok
 
                    1 => assert_eq!(msg.len(), 0), // err
 
                    _ => unreachable!(),
 
                }
 
            }
 
        },
 
        &|x| {
 
            // Receiver
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                // empty batch
 
                x.next_batch().unwrap();
 

	
 
                // got a message
 
                x.get(0).unwrap();
 
                match x.sync(timeout).unwrap() {
 
                    0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)),
 
                    1 => assert_ne!(Ok(&[] as &[u8]), x.read_gotten(0)),
 
                    _ => unreachable!(),
 
                }
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_fifo_1_e() {
 
    /*
 
        /-->\
 
    Alice   fifo_1
 
        \<--/
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    const N: usize = 10;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"fifo_1_e").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                // put
 
                assert_eq!(Ok(()), x.put(0, b"message~".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"message~".to_vec().into()));
 
                assert_eq!(Ok(0), x.sync(timeout));
 

	
 
                // get
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"message~" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
#[should_panic]
 
fn connector_causal_loop() {
 
    /*
 
        /-->\      /-->P|A-->\      /-->\
 
    Alice   exchange         exchange   Bob
 
        \<--/      \<--P|A<--/      \<--/
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"exchange").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap(); // peer out
 
            x.bind_port(1, Passive(addrs[1])).unwrap(); // peer in
 
            x.bind_port(2, Native).unwrap(); // native in
 
            x.bind_port(3, Native).unwrap(); // native out
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"A->B".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"A->B".to_vec().into()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"B->A" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"exchange").unwrap();
 
            x.bind_port(0, Active(addrs[1])).unwrap(); // peer out
 
            x.bind_port(1, Active(addrs[0])).unwrap(); // peer in
 
            x.bind_port(2, Native).unwrap(); // native in
 
            x.bind_port(3, Native).unwrap(); // native out
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"B->A".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"B->A".to_vec().into()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"A->B" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
#[should_panic]
 
fn connector_causal_loop2() {
 
    /*
 
        /-->\     /<---\
 
    Alice   samelen-->repl
 
        \<-------------/
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    // let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"samelen_repl").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"foo".to_vec()));
 
                assert_eq!(Ok(()), x.put(0, b"foo".to_vec().into()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_recover() {
 
    let connect_timeout = Duration::from_millis(1500);
 
    let comm_timeout = Duration::from_millis(300);
 
    let addrs = [next_addr()];
 
    fn putter_does(i: usize) -> bool {
 
        i % 3 == 0
 
    }
 
    fn getter_does(i: usize) -> bool {
 
        i % 2 == 0
 
    }
 
    fn expect_res(i: usize) -> Result<usize, SyncErr> {
 
        if putter_does(i) && getter_does(i) {
 
            Ok(0)
 
        } else {
 
            Err(SyncErr::Timeout)
 
        }
 
    }
 
    const N: usize = 11;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(connect_timeout).unwrap();
 

	
 
            for i in 0..N {
 
                if putter_does(i) {
 
                    assert_eq!(Ok(()), x.put(0, b"msg".to_vec()));
 
                    assert_eq!(Ok(()), x.put(0, b"msg".to_vec().into()));
 
                }
 
                assert_eq!(expect_res(i), x.sync(comm_timeout));
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(connect_timeout).unwrap();
 

	
 
            for i in 0..N {
 
                if getter_does(i) {
 
                    assert_eq!(Ok(()), x.get(0));
 
                }
 
                assert_eq!(expect_res(i), x.sync(comm_timeout));
 
                if expect_res(i).is_ok() {
 
                    assert_eq!(Ok(b"msg" as &[u8]), x.read_gotten(0));
 
                }
 
            }
 
        },
 
    ]));
 
}
0 comments (0 inline, 0 general)