diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 2622104e2d5aa40d557ffa26ec7c5fc44cd8908d..2acf1503e1dc5a26036832be06d56b3c97f74719 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -289,6 +289,8 @@ impl RuntimeInner { match &created.connector { ConnectorVariant::UserDefined(connector) => { + + println!("DEBUG: The connector {} owns the ports: {:?}", key.index, connector.ports.owned_ports.iter().map(|v| v.index).collect::>()); for port_id in connector.ports.owned_ports.iter().copied() { println!("DEBUG: Transferring port {:?} from {} to {}", port_id, created_by.context.id.0, key.index); let port = created_by.context.remove_port(port_id); diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 70274990203972a1013b5025d7720cdd57369f05..be781d33c4b90d74713c508c50d9e396d52ca4eb 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -123,17 +123,16 @@ impl Scheduler { debug_assert!(delta_state.outbox.is_empty()); // And respond with an Ack - self.runtime.send_message( - message.sending_connector, - Message{ - sending_connector: connector_id, - receiving_port: PortIdLocal::new_invalid(), - contents: MessageContents::Control(ControlMessage{ - id: content.id, - content: ControlMessageVariant::Ack, - }), - } - ); + let ack_message = Message{ + sending_connector: connector_id, + receiving_port: PortIdLocal::new_invalid(), + contents: MessageContents::Control(ControlMessage{ + id: content.id, + content: ControlMessageVariant::Ack, + }), + }; + self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message)); + self.runtime.send_message(message.sending_connector, ack_message); }, ControlMessageVariant::CloseChannel(port_id) => { // Mark the port as being closed @@ -141,17 +140,16 @@ impl Scheduler { port.state = PortState::Closed; // Send an Ack - self.runtime.send_message( - message.sending_connector, - Message{ - sending_connector: connector_id, - receiving_port: PortIdLocal::new_invalid(), - contents: MessageContents::Control(ControlMessage{ - id: content.id, - content: ControlMessageVariant::Ack, - }), - } - ); + let ack_message = Message{ + sending_connector: connector_id, + receiving_port: PortIdLocal::new_invalid(), + contents: MessageContents::Control(ControlMessage{ + id: content.id, + content: ControlMessageVariant::Ack, + }), + }; + self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message)); + self.runtime.send_message(message.sending_connector, ack_message); }, ControlMessageVariant::Ack => { @@ -219,6 +217,7 @@ impl Scheduler { port.self_id, port.peer_id, connector_id ); + self.debug_conn(connector_id, &format!("Sending message [ exit ] \n --- {:?}", message)); self.runtime.send_message(port.peer_connector, message); } } @@ -243,7 +242,7 @@ impl Scheduler { for mut message in delta_state.outbox.drain(..) { // Based on the message contents, decide where the message // should be sent to. This might end up modifying the message. - self.debug_conn(connector_id, &format!("Sending message\n --- {:?}", message)); + self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message)); let (peer_connector, self_port, peer_port) = match &mut message { MessageContents::Data(contents) => { let port = cur_connector.context.get_port(contents.sending_port); @@ -321,6 +320,7 @@ impl Scheduler { port.peer_connector, new_connector.context.id ); + self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message)); self.runtime.send_message(port.peer_connector, reroute_message); } @@ -465,8 +465,7 @@ impl ControlMessageHandler { pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option { for entry in &self.active { if let ControlVariant::ChangedPort(entry) = &entry.variant { - if entry.source_connector == sending_connector && - entry.target_port == target_port { + if entry.target_port == target_port { // Need to reroute this message return Some(entry.target_connector); } diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 794f42274ffbd045070e49049e068e1c613ba26f..04c61793c086384b81c5694d4a90c18e7a4912dc 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -5,52 +5,69 @@ use crate::{PortId, ProtocolDescription}; use crate::common::Id; use crate::protocol::eval::*; -fn runtime_for(num_threads: u32, pdl: &str) -> Runtime { +const NUM_THREADS: u32 = 4; // number of threads in runtime +const NUM_INSTANCES: u32 = 10; // number of test instances constructed +const NUM_LOOPS: u32 = 1; // number of loops within a single test (not used by all tests) + +fn create_runtime(pdl: &str) -> Runtime { let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl"); - let runtime = Runtime::new(num_threads, protocol); + let runtime = Runtime::new(NUM_THREADS, protocol); return runtime; } +fn run_test_in_runtime(pdl: &str, constructor: F) { + let protocol = ProtocolDescription::parse(pdl.as_bytes()) + .expect("parse PDL"); + let runtime = Runtime::new(NUM_THREADS, protocol); + + let mut api = runtime.create_interface(); + for _ in 0..NUM_INSTANCES { + constructor(&mut api); + } + + // Wait until done :) +} + #[test] fn test_put_and_get() { - let rt = runtime_for(4, " -primitive putter(out sender, u32 loops) { - u32 index = 0; - while (index < loops) { - synchronous { - print(\"putting!\"); - put(sender, true); + const CODE: &'static str = " + primitive putter(out sender, u32 loops) { + u32 index = 0; + while (index < loops) { + synchronous { + print(\"putting!\"); + put(sender, true); + } + index += 1; } - index += 1; } -} -primitive getter(in receiver, u32 loops) { - u32 index = 0; - while (index < loops) { - synchronous { - print(\"getting!\"); - auto result = get(receiver); - assert(result); + primitive getter(in receiver, u32 loops) { + u32 index = 0; + while (index < loops) { + synchronous { + print(\"getting!\"); + auto result = get(receiver); + assert(result); + } + index += 1; } - index += 1; } -} - "); + "; - let mut api = rt.create_interface(); - let channel = api.create_channel(); - let num_loops = 1; + run_test_in_runtime(CODE, |api| { + let channel = api.create_channel(); - api.create_connector("", "putter", ValueGroup::new_stack(vec![ - Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })), - Value::UInt32(num_loops) - ])).expect("create putter"); + api.create_connector("", "putter", ValueGroup::new_stack(vec![ + Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })), + Value::UInt32(NUM_LOOPS) + ])).expect("create putter"); - api.create_connector("", "getter", ValueGroup::new_stack(vec![ - Value::Input(PortId(Id{ connector_id: 0, u32_suffix: channel.getter_id.index })), - Value::UInt32(num_loops) - ])).expect("create getter"); + api.create_connector("", "getter", ValueGroup::new_stack(vec![ + Value::Input(PortId(Id{ connector_id: 0, u32_suffix: channel.getter_id.index })), + Value::UInt32(NUM_LOOPS) + ])).expect("create getter"); + }); } \ No newline at end of file