Changeset - 98bb2b044b92
[Not reviewed]
0 1 0
MH - 3 years ago 2022-05-19 18:07:47
contact@maxhenger.nl
Fix bug related to transferred data messages
1 file changed with 16 insertions and 10 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -449,32 +449,38 @@ pub(crate) fn default_handle_received_data_message(
 
    debug_assert!(inbox_main[port_index].is_none()); // because we've just received from it
 

	
 
    // If we received any ports, add them to the port tracking and inbox struct.
 
    // Then notify the peers that they can continue sending to this port, but
 
    // now at a new address.
 
    for received_port in &mut message.ports {
 
        // Transfer messages to main/backup inbox
 
        let _new_inbox_index = inbox_main.len();
 
        if !received_port.messages.is_empty() {
 
            inbox_main.push(Some(received_port.messages.remove(0)));
 
            inbox_backup.extend(received_port.messages.drain(..));
 
        } else {
 
            inbox_main.push(None);
 
        }
 

	
 
        // Create a new port locally
 
        let mut new_port_state = received_port.state;
 
        new_port_state.set(PortStateFlag::Received);
 
        let new_port_handle = comp_ctx.add_port(
 
            received_port.peer_comp, received_port.peer_port,
 
            received_port.kind, new_port_state
 
        );
 
        debug_assert_eq!(_new_inbox_index, comp_ctx.get_port_index(new_port_handle));
 
        comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(received_port.peer_comp));
 
        let new_port = comp_ctx.get_port(new_port_handle);
 

	
 
        // Transfer messages to main/backup inbox. Make sure to modify the
 
        // target port to our own
 
        for message in received_port.messages.iter_mut() {
 
            message.data_header.target_port = new_port.self_id;
 
        }
 

	
 
        let _new_inbox_index = inbox_main.len();
 
        if !received_port.messages.is_empty() {
 
            inbox_main.push(Some(received_port.messages.remove(0)));
 
            inbox_backup.extend(received_port.messages.drain(..));
 
        } else {
 
            inbox_main.push(None);
 
        }
 

	
 
        debug_assert_eq!(_new_inbox_index, comp_ctx.get_port_index(new_port_handle));
 

	
 
        // Add the port tho the consensus
 
        consensus.notify_of_new_port(_new_inbox_index, new_port_handle, comp_ctx);
 

	
 
        // Replace all references to the port in the received message
 
        for message_location in received_port.locations.iter().copied() {
 
            let value = message.content.get_value_mut(message_location);
0 comments (0 inline, 0 general)