Changeset - ceaa946df1eb
[Not reviewed]
0 4 0
MH - 4 years ago 2021-10-25 19:44:46
contact@maxhenger.nl
WIP on fixing reroute bug
4 files changed with 5 insertions and 7 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -278,13 +278,12 @@ impl ConnectorPorts {
 
    // based on branch/port index.
 
    #[inline]
 
    fn mapped_index(&self, branch_idx: u32, port_idx: usize) -> usize {
 
        let branch_idx = branch_idx as usize;
 
        let num_ports = self.owned_ports.len();
 

	
 
        println!("port_idx = {}, branch_idx = {}, num_ports = {}, port_mapping.len() = {}", port_idx, branch_idx, num_ports, self.port_mapping.len());
 
        debug_assert!(port_idx < num_ports);
 
        debug_assert!((branch_idx + 1) * num_ports <= self.port_mapping.len());
 

	
 
        return branch_idx * num_ports + port_idx;
 
    }
 
}
src/runtime2/native.rs
Show inline comments
 
@@ -130,13 +130,13 @@ impl ApplicationInterface {
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    /// Creates a new connector. Note that it is not scheduled immediately, but
 
    /// depends on the `ApplicationConnector` to run, followed by the created
 
    /// connector being scheduled.
 
    // TODO: Optimize by yanking out scheduler logic for common use.
 
    // TODO: Yank out scheduler logic for common use.
 
    pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> {
 
        // Retrieve ports and make sure that we own the ones that are currently
 
        // specified. This is also checked by the scheduler, but that is done
 
        // asynchronously.
 
        let mut initial_ports = Vec::new();
 
        find_ports_in_value_group(&arguments, &mut initial_ports);
src/runtime2/scheduler.rs
Show inline comments
 
@@ -70,13 +70,12 @@ impl Scheduler {
 
        return Self{ runtime, scheduler_id };
 
    }
 

	
 
    pub fn run(&mut self) {
 
        // Setup global storage and workspaces that are reused for every
 
        // connector that we run
 
        let scheduler_id = self.scheduler_id;
 
        let mut delta_state = RunDeltaState::new();
 

	
 
        'thread_loop: loop {
 
            // Retrieve a unit of work
 
            self.debug("Waiting for work");
 
            let connector_key = self.runtime.wait_for_work();
 
@@ -97,18 +96,20 @@ impl Scheduler {
 
            // connector.
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 
            while cur_schedule == ConnectorScheduling::Immediate {
 
                // Check all the message that are in the shared inbox
 
                while let Some(message) = scheduled.public.inbox.take_message() {
 
                    // Check for rerouting
 
                    self.debug_conn(connector_id, &format!("Handling message from {}:{}\n --- {:?}", message.sending_connector.0, message.receiving_port.index, message));
 
                    self.debug_conn(connector_id, &format!("Handling message from conn({}) at port({})\n --- {:?}", message.sending_connector.0, message.receiving_port.index, message));
 
                    if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) {
 
                        self.debug_conn(connector_id, &format!(" ... Rerouting to connector {}", other_connector_id.0));
 
                        self.runtime.send_message(other_connector_id, message);
 
                        continue;
 
                    }
 

	
 
                    self.debug_conn(connector_id, " ... Handling message myself");
 
                    // Check for messages that requires special action from the
 
                    // scheduler.
 
                    if let MessageContents::Control(content) = message.contents {
 
                        match content.content {
 
                            ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
 
                                // Need to change port target
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -39,20 +39,18 @@ primitive getter(in<bool> receiver, u32 loops) {
 
    }
 
}
 
    ");
 

	
 
    let mut api = rt.create_interface();
 
    let channel = api.create_channel();
 
    let num_loops = 100;
 
    let num_loops = 1;
 

	
 
    api.create_connector("", "putter", ValueGroup::new_stack(vec![
 
        Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })),
 
        Value::UInt32(num_loops)
 
    ])).expect("create putter");
 

	
 
    api.create_connector("", "getter", ValueGroup::new_stack(vec![
 
        Value::Input(PortId(Id{ connector_id: 0, u32_suffix: channel.getter_id.index })),
 
        Value::UInt32(num_loops)
 
    ])).expect("create getter");
 

	
 
    println!("Am I running?");
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)