From 4bfd6d133687d8d63e7b55620b15f319ac70fe6a 2020-09-29 09:22:35 From: Christopher Esterhuyse Date: 2020-09-29 09:22:35 Subject: [PATCH] more unit tests. minor bugfixes in protocol/eval --- diff --git a/src/protocol/eval.rs b/src/protocol/eval.rs index b89a1a442a3ae9963dfdf345b2fcb0ef887a9a5b..fd105d55a59c174201ec1665be91beee700f9aa5 100644 --- a/src/protocol/eval.rs +++ b/src/protocol/eval.rs @@ -59,7 +59,7 @@ impl Value { // Only messages within the expected length are allowed Value::Message(MessageValue(None)) } else { - Value::Message(MessageValue(Some(Payload::new(0)))) + Value::Message(MessageValue(Some(Payload::new(length as usize)))) } } _ => unimplemented!(), @@ -269,6 +269,17 @@ impl Value { (Value::Long(LongValue(s)), Value::Long(LongValue(o))) => { Value::Long(LongValue(*s + *o)) } + + (Value::Message(MessageValue(s)), Value::Message(MessageValue(o))) => { + let payload = if let [Some(s), Some(o)] = [s, o] { + let mut payload = s.clone(); + payload.concatenate_with(o); + Some(payload) + } else { + None + }; + Value::Message(MessageValue(payload)) + } _ => unimplemented!(), } } diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index f0ea80da75a74343044a206bdee2eb74f0c96980..1dddc8395a31fd2cfe788c22b4eb2eea2aebd726 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -105,7 +105,7 @@ impl ProtocolDescription { Ok(result) } // expects port polarities to be correct - pub(crate) fn new_main_component(&self, identifier: &[u8], ports: &[PortId]) -> ComponentState { + pub(crate) fn new_component(&self, identifier: &[u8], ports: &[PortId]) -> ComponentState { let mut args = Vec::new(); for (&x, y) in ports.iter().zip(self.component_polarities(identifier).unwrap()) { match y { diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 8cc96f1cfbb5f3c577b2237f1d7d8c04a8bad4e7..249d24aa72edaf9e017050f6eb6d2b7211d2c28d 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -657,8 +657,7 @@ impl Connector { // create a new component and identifier let cu = &mut self.unphased; let new_cid = cu.ips.id_manager.new_component_id(); - cu.proto_components - .insert(new_cid, cu.proto_description.new_main_component(identifier, ports)); + cu.proto_components.insert(new_cid, cu.proto_description.new_component(identifier, ports)); // update the ownership of moved ports for port in ports.iter() { match cu.ips.port_info.map.get_mut(port) { diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 35561a07596524f63836ed3c36fe668052eeb457..37dd16c8f019f4eb609b0ed1b92e6800a5bb12ef 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -270,26 +270,6 @@ fn setup_endpoints_and_pair_ports( const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); const RETRY_PERIOD: Duration = Duration::from_millis(200); - // The structure shared between this ("setup") thread and that of the waker. - // The waker thread periodically sends signals. - // struct WakerState { - // continue_signal: AtomicBool, - // waker: mio::Waker, - // } - // impl WakerState { - // // The waker thread runs this UNTIL the continue signal is set to false - // fn waker_loop(&self) { - // while self.continue_signal.load(SeqCst) { - // std::thread::sleep(WAKER_PERIOD); - // let _ = self.waker.wake(); - // } - // } - // // The setup thread thread runs this to set the continue signal to false. - // fn waker_stop(&self) { - // self.continue_signal.store(false, SeqCst); - // } - // } - // The data for a net endpoint's setup in progress struct NetTodo { // becomes completed once sent_local_port && recv_peer_port.is_some() @@ -378,17 +358,21 @@ fn setup_endpoints_and_pair_ports( }; // progress by reacting to poll events. continue until every endpoint is set up while !setup_incomplete.is_empty() { - // recompute the time left to poll for progress - let remaining = if let Some(deadline) = deadline { - deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?.min(RETRY_PERIOD) - } else { - RETRY_PERIOD + // recompute the timeout for the poll call + let remaining = match (deadline, net_connect_to_retry.is_empty()) { + (None, true) => None, + (None, false) => Some(RETRY_PERIOD), + (Some(deadline), is_empty) => { + let dur_to_timeout = + deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?; + Some(if is_empty { dur_to_timeout } else { dur_to_timeout.min(RETRY_PERIOD) }) + } }; // block until either // (a) `events` has been populated with 1+ elements // (b) timeout elapses, or // (c) RETRY_PERIOD elapses - poll.poll(&mut events, Some(remaining)).map_err(|_| Ce::PollFailed)?; + poll.poll(&mut events, remaining).map_err(|_| Ce::PollFailed)?; if last_retry_at.elapsed() > RETRY_PERIOD { // Retry all net connections and reset `last_retry_at` last_retry_at = Instant::now(); diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index c6075cd5eede3701425e3f722a6a229fbef1eada..913f12767c698475a148dc96f2629bdb0df361ea 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -1251,3 +1251,88 @@ fn xrouter_comp() { } println!("COMP {:?}", now.elapsed()); } + +#[test] +fn count_stream() { + let test_log_path = Path::new("./logs/count_stream"); + let pdl = b" + primitive count_stream(out o) { + msg m = create(1); + m[0] = 0; + while(true) synchronous { + put(o, m); + m[0] += 1; + } + } + "; + let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); + let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); + + // setup a session between (a) native, and (b) sequencer3, connected by 3 ports. + let [p0, g0] = c.new_port_pair(); + c.add_component(b"count_stream", &[p0]).unwrap(); + c.connect(None).unwrap(); + + for expecting in 0u8..16 { + c.get(g0).unwrap(); + c.sync(None).unwrap(); + assert_eq!(&[expecting], c.gotten(g0).unwrap().as_slice()); + } +} + +#[test] +fn for_msg_byte() { + let test_log_path = Path::new("./logs/for_msg_byte"); + let pdl = b" + primitive for_msg_byte(out o) { + byte i = 0; + while(i<8) { + msg m = create(1); + m[0] = i; + synchronous() put(o, m); + i++; + } + } + "; + let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); + let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); + + // setup a session between (a) native, and (b) sequencer3, connected by 3 ports. + let [p0, g0] = c.new_port_pair(); + c.add_component(b"for_msg_byte", &[p0]).unwrap(); + c.connect(None).unwrap(); + + for expecting in 0u8..8 { + c.get(g0).unwrap(); + c.sync(None).unwrap(); + assert_eq!(&[expecting], c.gotten(g0).unwrap().as_slice()); + } + c.sync(None).unwrap(); +} + +#[test] +fn message_concat() { + // Note: PDL quirks: + // 1. declarations as first lines of a scope + // 2. var names cannot be prefixed by types. Eg `msg_concat` prohibited. + let test_log_path = Path::new("./logs/message_concat"); + let pdl = b" + primitive message_concat(out o) { + msg a = create(1); + msg b = create(1); + a[0] = 0; + b[0] = 1; + synchronous() put(o, a+b); + } + "; + let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); + let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); + + // setup a session between (a) native, and (b) sequencer3, connected by 3 ports. + let [p0, g0] = c.new_port_pair(); + c.add_component(b"message_concat", &[p0]).unwrap(); + c.connect(None).unwrap(); + c.get(g0).unwrap(); + c.sync(None).unwrap(); + assert_eq!(&[0, 1], c.gotten(g0).unwrap().as_slice()); +}