Changeset - 4bfd6d133687
[Not reviewed]
0 5 0
Christopher Esterhuyse - 5 years ago 2020-09-29 09:22:35
christopher.esterhuyse@gmail.com
more unit tests. minor bugfixes in protocol/eval
5 files changed with 109 insertions and 30 deletions:
0 comments (0 inline, 0 general)
src/protocol/eval.rs
Show inline comments
 
@@ -50,25 +50,25 @@ pub enum Value {
 
impl Value {
 
    pub fn receive_message(buffer: &Payload) -> Value {
 
        Value::Message(MessageValue(Some(buffer.clone())))
 
    }
 
    fn create_message(length: Value) -> Value {
 
        match length {
 
            Value::Byte(_) | Value::Short(_) | Value::Int(_) | Value::Long(_) => {
 
                let length: i64 = i64::from(length);
 
                if length < 0 || length > MESSAGE_MAX_LENGTH {
 
                    // Only messages within the expected length are allowed
 
                    Value::Message(MessageValue(None))
 
                } else {
 
                    Value::Message(MessageValue(Some(Payload::new(0))))
 
                    Value::Message(MessageValue(Some(Payload::new(length as usize))))
 
                }
 
            }
 
            _ => unimplemented!(),
 
        }
 
    }
 
    fn from_constant(constant: &Constant) -> Value {
 
        match constant {
 
            Constant::Null => Value::Message(MessageValue(None)),
 
            Constant::True => Value::Boolean(BooleanValue(true)),
 
            Constant::False => Value::Boolean(BooleanValue(false)),
 
            Constant::Integer(data) => {
 
                // Convert raw ASCII data to UTF-8 string
 
@@ -260,24 +260,35 @@ impl Value {
 
            (Value::Long(LongValue(s)), Value::Byte(ByteValue(o))) => {
 
                Value::Long(LongValue(*s + *o as i64))
 
            }
 
            (Value::Long(LongValue(s)), Value::Short(ShortValue(o))) => {
 
                Value::Long(LongValue(*s + *o as i64))
 
            }
 
            (Value::Long(LongValue(s)), Value::Int(IntValue(o))) => {
 
                Value::Long(LongValue(*s + *o as i64))
 
            }
 
            (Value::Long(LongValue(s)), Value::Long(LongValue(o))) => {
 
                Value::Long(LongValue(*s + *o))
 
            }
 

	
 
            (Value::Message(MessageValue(s)), Value::Message(MessageValue(o))) => {
 
                let payload = if let [Some(s), Some(o)] = [s, o] {
 
                    let mut payload = s.clone();
 
                    payload.concatenate_with(o);
 
                    Some(payload)
 
                } else {
 
                    None
 
                };
 
                Value::Message(MessageValue(payload))
 
            }
 
            _ => unimplemented!(),
 
        }
 
    }
 
    fn minus(&self, other: &Value) -> Value {
 
        match (self, other) {
 
            (Value::Byte(ByteValue(s)), Value::Byte(ByteValue(o))) => {
 
                Value::Byte(ByteValue(*s - *o))
 
            }
 
            (Value::Byte(ByteValue(s)), Value::Short(ShortValue(o))) => {
 
                Value::Short(ShortValue(*s as i16 - *o))
 
            }
 
            (Value::Byte(ByteValue(s)), Value::Int(IntValue(o))) => {
src/protocol/mod.rs
Show inline comments
 
@@ -96,25 +96,25 @@ impl ProtocolDescription {
 
            let ptype = &type_annot.the_type.primitive;
 
            if ptype == &PrimitiveType::Input {
 
                result.push(Polarity::Getter)
 
            } else if ptype == &PrimitiveType::Output {
 
                result.push(Polarity::Putter)
 
            } else {
 
                unreachable!()
 
            }
 
        }
 
        Ok(result)
 
    }
 
    // expects port polarities to be correct
 
    pub(crate) fn new_main_component(&self, identifier: &[u8], ports: &[PortId]) -> ComponentState {
 
    pub(crate) fn new_component(&self, identifier: &[u8], ports: &[PortId]) -> ComponentState {
 
        let mut args = Vec::new();
 
        for (&x, y) in ports.iter().zip(self.component_polarities(identifier).unwrap()) {
 
            match y {
 
                Polarity::Getter => args.push(Value::Input(InputValue(x))),
 
                Polarity::Putter => args.push(Value::Output(OutputValue(x))),
 
            }
 
        }
 
        let h = &self.heap;
 
        let root = &h[self.root];
 
        let def = root.get_definition_ident(h, identifier).unwrap();
 
        ComponentState { prompt: Prompt::new(h, def, &args) }
 
    }
src/runtime/mod.rs
Show inline comments
 
@@ -648,26 +648,25 @@ impl Connector {
 
            let info = cu.ips.port_info.map.get(&port).ok_or(Ace::UnknownPort(port))?;
 
            if info.owner != cu.native_component_id {
 
                return Err(Ace::UnknownPort(port));
 
            }
 
            if info.polarity != expected_polarity {
 
                return Err(Ace::WrongPortPolarity { port, expected_polarity });
 
            }
 
        }
 
        // No errors! Time to modify `cu`
 
        // create a new component and identifier
 
        let cu = &mut self.unphased;
 
        let new_cid = cu.ips.id_manager.new_component_id();
 
        cu.proto_components
 
            .insert(new_cid, cu.proto_description.new_main_component(identifier, ports));
 
        cu.proto_components.insert(new_cid, cu.proto_description.new_component(identifier, ports));
 
        // update the ownership of moved ports
 
        for port in ports.iter() {
 
            match cu.ips.port_info.map.get_mut(port) {
 
                Some(port_info) => port_info.owner = new_cid,
 
                None => unreachable!(),
 
            }
 
        }
 
        Ok(())
 
    }
 
}
 
impl Predicate {
 
    #[inline]
src/runtime/setup.rs
Show inline comments
 
@@ -261,44 +261,24 @@ impl Connector {
 
// - new information about ports acquired through the newly-created channels
 
fn setup_endpoints_and_pair_ports(
 
    logger: &mut dyn Logger,
 
    net_endpoint_setups: &[NetEndpointSetup],
 
    udp_endpoint_setups: &[UdpEndpointSetup],
 
    port_info: &PortInfoMap,
 
    deadline: &Option<Instant>,
 
) -> Result<(EndpointManager, ExtraPortInfo), ConnectError> {
 
    use ConnectError as Ce;
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    const RETRY_PERIOD: Duration = Duration::from_millis(200);
 

	
 
    // The structure shared between this ("setup") thread and that of the waker.
 
    // The waker thread periodically sends signals.
 
    // struct WakerState {
 
    //     continue_signal: AtomicBool,
 
    //     waker: mio::Waker,
 
    // }
 
    // impl WakerState {
 
    //     // The waker thread runs this UNTIL the continue signal is set to false
 
    //     fn waker_loop(&self) {
 
    //         while self.continue_signal.load(SeqCst) {
 
    //             std::thread::sleep(WAKER_PERIOD);
 
    //             let _ = self.waker.wake();
 
    //         }
 
    //     }
 
    //     // The setup thread thread runs this to set the continue signal to false.
 
    //     fn waker_stop(&self) {
 
    //         self.continue_signal.store(false, SeqCst);
 
    //     }
 
    // }
 

	
 
    // The data for a net endpoint's setup in progress
 
    struct NetTodo {
 
        // becomes completed once sent_local_port && recv_peer_port.is_some()
 
        // we send local port if we haven't already and we receive a writable event
 
        // we recv peer port if we haven't already and we receive a readbale event
 
        todo_endpoint: NetTodoEndpoint,
 
        endpoint_setup: NetEndpointSetup,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 

	
 
    // The data for a udp endpoint's setup in progress
 
@@ -369,35 +349,39 @@ fn setup_endpoints_and_pair_ports(
 

	
 
    // Initially no net connections have failed, and all udp and net endpoint setups are incomplete
 
    let mut net_connect_to_retry: HashSet<usize> = Default::default();
 
    let mut setup_incomplete: HashSet<TokenTarget> = {
 
        let net_todo_targets_iter =
 
            (0..net_todos.len()).map(|index| TokenTarget::NetEndpoint { index });
 
        let udp_todo_targets_iter =
 
            (0..udp_todos.len()).map(|index| TokenTarget::UdpEndpoint { index });
 
        net_todo_targets_iter.chain(udp_todo_targets_iter).collect()
 
    };
 
    // progress by reacting to poll events. continue until every endpoint is set up
 
    while !setup_incomplete.is_empty() {
 
        // recompute the time left to poll for progress
 
        let remaining = if let Some(deadline) = deadline {
 
            deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?.min(RETRY_PERIOD)
 
        } else {
 
            RETRY_PERIOD
 
        // recompute the timeout for the poll call
 
        let remaining = match (deadline, net_connect_to_retry.is_empty()) {
 
            (None, true) => None,
 
            (None, false) => Some(RETRY_PERIOD),
 
            (Some(deadline), is_empty) => {
 
                let dur_to_timeout =
 
                    deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?;
 
                Some(if is_empty { dur_to_timeout } else { dur_to_timeout.min(RETRY_PERIOD) })
 
            }
 
        };
 
        // block until either
 
        // (a) `events` has been populated with 1+ elements
 
        // (b) timeout elapses, or
 
        // (c) RETRY_PERIOD elapses
 
        poll.poll(&mut events, Some(remaining)).map_err(|_| Ce::PollFailed)?;
 
        poll.poll(&mut events, remaining).map_err(|_| Ce::PollFailed)?;
 
        if last_retry_at.elapsed() > RETRY_PERIOD {
 
            // Retry all net connections and reset `last_retry_at`
 
            last_retry_at = Instant::now();
 
            for net_index in net_connect_to_retry.drain() {
 
                // Restart connect procedure for this net endpoint
 
                let net_todo = &mut net_todos[net_index];
 
                log!(
 
                    logger,
 
                    "Restarting connection with endpoint {:?} {:?}",
 
                    net_index,
 
                    net_todo.endpoint_setup.sock_addr
 
                );
src/runtime/tests.rs
Show inline comments
 
@@ -1242,12 +1242,97 @@ fn xrouter_comp() {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.get(g1).unwrap();
 
            }
 
            XRouterItem::GetB => {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.get(g2).unwrap();
 
            }
 
        }
 
        assert_eq!(0, c.sync(SEC1).unwrap());
 
    }
 
    println!("COMP {:?}", now.elapsed());
 
}
 

	
 
#[test]
 
fn count_stream() {
 
    let test_log_path = Path::new("./logs/count_stream");
 
    let pdl = b"
 
    primitive count_stream(out o) {
 
        msg m = create(1);
 
        m[0] = 0;
 
        while(true) synchronous {
 
            put(o, m);
 
            m[0] += 1;
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    c.add_component(b"count_stream", &[p0]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    for expecting in 0u8..16 {
 
        c.get(g0).unwrap();
 
        c.sync(None).unwrap();
 
        assert_eq!(&[expecting], c.gotten(g0).unwrap().as_slice());
 
    }
 
}
 

	
 
#[test]
 
fn for_msg_byte() {
 
    let test_log_path = Path::new("./logs/for_msg_byte");
 
    let pdl = b"
 
    primitive for_msg_byte(out o) {
 
        byte i = 0;
 
        while(i<8) {
 
            msg m = create(1);
 
            m[0] = i;
 
            synchronous() put(o, m);
 
            i++;
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    c.add_component(b"for_msg_byte", &[p0]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    for expecting in 0u8..8 {
 
        c.get(g0).unwrap();
 
        c.sync(None).unwrap();
 
        assert_eq!(&[expecting], c.gotten(g0).unwrap().as_slice());
 
    }
 
    c.sync(None).unwrap();
 
}
 

	
 
#[test]
 
fn message_concat() {
 
    // Note: PDL quirks:
 
    // 1. declarations as first lines of a scope
 
    // 2. var names cannot be prefixed by types. Eg `msg_concat` prohibited.
 
    let test_log_path = Path::new("./logs/message_concat");
 
    let pdl = b"
 
    primitive message_concat(out o) {
 
        msg a = create(1);
 
        msg b = create(1);
 
        a[0] = 0;
 
        b[0] = 1;
 
        synchronous() put(o, a+b);
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    c.add_component(b"message_concat", &[p0]).unwrap();
 
    c.connect(None).unwrap();
 
    c.get(g0).unwrap();
 
    c.sync(None).unwrap();
 
    assert_eq!(&[0, 1], c.gotten(g0).unwrap().as_slice());
 
}
0 comments (0 inline, 0 general)