Changeset - c502fc0c252a
[Not reviewed]
0 3 0
MH - 4 years ago 2021-10-26 20:16:25
contact@maxhenger.nl
Preparing to fix 'delayed port transfer' bug
3 files changed with 75 insertions and 57 deletions:
0 comments (0 inline, 0 general)
src/runtime2/mod.rs
Show inline comments
 
@@ -289,6 +289,8 @@ impl RuntimeInner {
 

	
 
            match &created.connector {
 
                ConnectorVariant::UserDefined(connector) => {
 

	
 
                    println!("DEBUG: The connector {} owns the ports: {:?}", key.index, connector.ports.owned_ports.iter().map(|v| v.index).collect::<Vec<_>>());
 
                    for port_id in connector.ports.owned_ports.iter().copied() {
 
                        println!("DEBUG: Transferring port {:?} from {} to {}", port_id, created_by.context.id.0, key.index);
 
                        let port = created_by.context.remove_port(port_id);
src/runtime2/scheduler.rs
Show inline comments
 
@@ -123,17 +123,16 @@ impl Scheduler {
 
                                debug_assert!(delta_state.outbox.is_empty());
 

	
 
                                // And respond with an Ack
 
                                self.runtime.send_message(
 
                                    message.sending_connector,
 
                                    Message{
 
                                        sending_connector: connector_id,
 
                                        receiving_port: PortIdLocal::new_invalid(),
 
                                        contents: MessageContents::Control(ControlMessage{
 
                                            id: content.id,
 
                                            content: ControlMessageVariant::Ack,
 
                                        }),
 
                                    }
 
                                );
 
                                let ack_message = Message{
 
                                    sending_connector: connector_id,
 
                                    receiving_port: PortIdLocal::new_invalid(),
 
                                    contents: MessageContents::Control(ControlMessage{
 
                                        id: content.id,
 
                                        content: ControlMessageVariant::Ack,
 
                                    }),
 
                                };
 
                                self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message));
 
                                self.runtime.send_message(message.sending_connector, ack_message);
 
                            },
 
                            ControlMessageVariant::CloseChannel(port_id) => {
 
                                // Mark the port as being closed
 
@@ -141,17 +140,16 @@ impl Scheduler {
 
                                port.state = PortState::Closed;
 

	
 
                                // Send an Ack
 
                                self.runtime.send_message(
 
                                    message.sending_connector,
 
                                    Message{
 
                                        sending_connector: connector_id,
 
                                        receiving_port: PortIdLocal::new_invalid(),
 
                                        contents: MessageContents::Control(ControlMessage{
 
                                            id: content.id,
 
                                            content: ControlMessageVariant::Ack,
 
                                        }),
 
                                    }
 
                                );
 
                                let ack_message = Message{
 
                                    sending_connector: connector_id,
 
                                    receiving_port: PortIdLocal::new_invalid(),
 
                                    contents: MessageContents::Control(ControlMessage{
 
                                        id: content.id,
 
                                        content: ControlMessageVariant::Ack,
 
                                    }),
 
                                };
 
                                self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message));
 
                                self.runtime.send_message(message.sending_connector, ack_message);
 

	
 
                            },
 
                            ControlMessageVariant::Ack => {
 
@@ -219,6 +217,7 @@ impl Scheduler {
 
                                port.self_id, port.peer_id,
 
                                connector_id
 
                            );
 
                            self.debug_conn(connector_id, &format!("Sending message [ exit ] \n --- {:?}", message));
 
                            self.runtime.send_message(port.peer_connector, message);
 
                        }
 
                    }
 
@@ -243,7 +242,7 @@ impl Scheduler {
 
            for mut message in delta_state.outbox.drain(..) {
 
                // Based on the message contents, decide where the message
 
                // should be sent to. This might end up modifying the message.
 
                self.debug_conn(connector_id, &format!("Sending message\n --- {:?}", message));
 
                self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message));
 
                let (peer_connector, self_port, peer_port) = match &mut message {
 
                    MessageContents::Data(contents) => {
 
                        let port = cur_connector.context.get_port(contents.sending_port);
 
@@ -321,6 +320,7 @@ impl Scheduler {
 
                        port.peer_connector, new_connector.context.id
 
                    );
 

	
 
                    self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message));
 
                    self.runtime.send_message(port.peer_connector, reroute_message);
 
                }
 

	
 
@@ -465,8 +465,7 @@ impl ControlMessageHandler {
 
    pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option<ConnectorId> {
 
        for entry in &self.active {
 
            if let ControlVariant::ChangedPort(entry) = &entry.variant {
 
                if entry.source_connector == sending_connector &&
 
                    entry.target_port == target_port {
 
                if entry.target_port == target_port {
 
                    // Need to reroute this message
 
                    return Some(entry.target_connector);
 
                }
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -5,52 +5,69 @@ use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 

	
 
fn runtime_for(num_threads: u32, pdl: &str) -> Runtime {
 
const NUM_THREADS: u32 = 4;     // number of threads in runtime
 
const NUM_INSTANCES: u32 = 10;   // number of test instances constructed
 
const NUM_LOOPS: u32 = 1;       // number of loops within a single test (not used by all tests)
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
 
    let runtime = Runtime::new(num_threads, protocol);
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    return runtime;
 
}
 

	
 
fn run_test_in_runtime<F: Fn(&mut ApplicationInterface)>(pdl: &str, constructor: F) {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes())
 
        .expect("parse PDL");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    let mut api = runtime.create_interface();
 
    for _ in 0..NUM_INSTANCES {
 
        constructor(&mut api);
 
    }
 

	
 
    // Wait until done :)
 
}
 

	
 
#[test]
 
fn test_put_and_get() {
 
    let rt = runtime_for(4, "
 
primitive putter(out<bool> sender, u32 loops) {
 
    u32 index = 0;
 
    while (index < loops) {
 
        synchronous {
 
            print(\"putting!\");
 
            put(sender, true);
 
    const CODE: &'static str = "
 
    primitive putter(out<bool> sender, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            synchronous {
 
                print(\"putting!\");
 
                put(sender, true);
 
            }
 
            index += 1;
 
        }
 
        index += 1;
 
    }
 
}
 

	
 
primitive getter(in<bool> receiver, u32 loops) {
 
    u32 index = 0;
 
    while (index < loops) {
 
        synchronous {
 
            print(\"getting!\");
 
            auto result = get(receiver);
 
            assert(result);
 
    primitive getter(in<bool> receiver, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            synchronous {
 
                print(\"getting!\");
 
                auto result = get(receiver);
 
                assert(result);
 

	
 
            }
 
            index += 1;
 
        }
 
        index += 1;
 
    }
 
}
 
    ");
 
    ";
 

	
 
    let mut api = rt.create_interface();
 
    let channel = api.create_channel();
 
    let num_loops = 1;
 
    run_test_in_runtime(CODE, |api| {
 
        let channel = api.create_channel();
 

	
 
    api.create_connector("", "putter", ValueGroup::new_stack(vec![
 
        Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })),
 
        Value::UInt32(num_loops)
 
    ])).expect("create putter");
 
        api.create_connector("", "putter", ValueGroup::new_stack(vec![
 
            Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })),
 
            Value::UInt32(NUM_LOOPS)
 
        ])).expect("create putter");
 

	
 
    api.create_connector("", "getter", ValueGroup::new_stack(vec![
 
        Value::Input(PortId(Id{ connector_id: 0, u32_suffix: channel.getter_id.index })),
 
        Value::UInt32(num_loops)
 
    ])).expect("create getter");
 
        api.create_connector("", "getter", ValueGroup::new_stack(vec![
 
            Value::Input(PortId(Id{ connector_id: 0, u32_suffix: channel.getter_id.index })),
 
            Value::UInt32(NUM_LOOPS)
 
        ])).expect("create getter");
 
    });
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)