Changeset - 14f5de1d394a
[Not reviewed]
0 4 0
MH - 3 years ago 2022-05-13 18:55:40
contact@maxhenger.nl
Fix bug with port transfer when transferred ports are blocked
4 files changed with 106 insertions and 49 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -83,8 +83,9 @@ pub(crate) enum CompMode {
 
    BlockedGet, // blocked because we need to receive a message on a particular port
 
    BlockedPut, // component is blocked because the port is blocked
 
    BlockedSelect, // waiting on message to complete the select statement
 
    BlockedPutPortsAwaitingAcks,// blocked because we're waiting to send a data message containing ports, but first need to receive Acks for the PortPeerChanged messages
 
    BlockedPutPortsReady, // blocked because we're waitingto send a data message containing ports
 
    PutPortsBlockedTransferredPorts, // sending a message with ports, those sent ports are (partly) blocked
 
    PutPortsBlockedAwaitingAcks, // sent out PPC message for blocking transferred ports, now awaiting Acks
 
    PutPortsBlockedSendingPort, // sending a message with ports, message sent through a still-blocked port
 
    NewComponentBlocked, // waiting until ports are in the appropriate state to create a new component
 
    StartExit, // temporary state: if encountered then we start the shutdown process.
 
    BusyExit, // temporary state: waiting for Acks for all the closed ports, potentially waiting for sync round to finish
 
@@ -97,7 +98,9 @@ impl CompMode {
 

	
 
        match self {
 
            Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect |
 
                BlockedPutPortsAwaitingAcks | BlockedPutPortsReady => true,
 
                PutPortsBlockedTransferredPorts |
 
                PutPortsBlockedAwaitingAcks |
 
                PutPortsBlockedSendingPort => true,
 
            NonSync | NewComponentBlocked | StartExit | BusyExit | Exit => false,
 
        }
 
    }
 
@@ -107,7 +110,9 @@ impl CompMode {
 

	
 
        match self {
 
            NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect |
 
                BlockedPutPortsAwaitingAcks | BlockedPutPortsReady |
 
            PutPortsBlockedTransferredPorts |
 
            PutPortsBlockedAwaitingAcks |
 
            PutPortsBlockedSendingPort |
 
                NewComponentBlocked => false,
 
            StartExit | BusyExit => true,
 
            Exit => false,
 
@@ -197,7 +202,7 @@ impl CompExecState {
 
    }
 

	
 
    pub(crate) fn set_as_blocked_put_with_ports(&mut self, port: PortId, value: ValueGroup) {
 
        self.mode = CompMode::BlockedPutPortsAwaitingAcks;
 
        self.mode = CompMode::PutPortsBlockedTransferredPorts;
 
        self.mode_port = port;
 
        self.mode_value = value;
 
    }
 
@@ -208,12 +213,6 @@ impl CompExecState {
 
            self.mode_port == port;
 
    }
 

	
 
    pub(crate) fn is_blocked_on_put_with_ports(&self, port: PortId) -> bool {
 
        return
 
            self.mode == CompMode::BlockedPutPortsReady &&
 
            self.mode_port == port;
 
    }
 

	
 
    pub(crate) fn is_blocked_on_create_component(&self) -> bool {
 
        return self.mode == CompMode::NewComponentBlocked;
 
    }
 
@@ -293,7 +292,7 @@ pub(crate) fn default_send_data_message(
 
            format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0)
 
        ))
 
    } else if !ports.is_empty() {
 
        prepare_send_message_with_ports(
 
        start_send_message_with_ports(
 
            transmitting_port_id, port_instruction, value, exec_state,
 
            comp_ctx, sched_ctx, control
 
        )?;
 
@@ -544,7 +543,7 @@ pub(crate) fn default_handle_control_message(
 
) -> Result<(), (PortInstruction, String)> {
 
    match message.content {
 
        ControlMessageContent::Ack => {
 
            default_handle_ack(exec_state, control, message.id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup);
 
            default_handle_ack(exec_state, control, message.id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup)?;
 
        },
 
        ControlMessageContent::BlockPort => {
 
            // One of our messages was accepted, but the port should be
 
@@ -577,7 +576,7 @@ pub(crate) fn default_handle_control_message(
 
                // The two components (sender and this component) are closing
 
                // the channel at the same time. So we don't care about the
 
                // content of the `ClosePort` message.
 
                default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup);
 
                default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup)?;
 
            } else {
 
                // Respond to the message
 
                let port_info = comp_ctx.get_port(port_handle);
 
@@ -625,7 +624,7 @@ pub(crate) fn default_handle_control_message(
 
            default_handle_recently_unblocked_port(
 
                exec_state, control, consensus, port_handle, sched_ctx,
 
                comp_ctx, inbox_main, inbox_backup
 
            );
 
            )?;
 
        },
 
        ControlMessageContent::PortPeerChangedBlock => {
 
            // The peer of our port has just changed. So we are asked to
 
@@ -656,7 +655,7 @@ pub(crate) fn default_handle_control_message(
 
            default_handle_recently_unblocked_port(
 
                exec_state, control, consensus, port_handle, sched_ctx,
 
                comp_ctx, inbox_main, inbox_backup
 
            );
 
            )?;
 
        }
 
    }
 

	
 
@@ -1105,20 +1104,43 @@ fn send_message_without_ports(
 
/// Prepares sending a message that contains ports. Only once a particular
 
/// protocol has completed (where we notify all the peers that the ports will
 
/// be transferred) will we actually send the message to the recipient.
 
fn prepare_send_message_with_ports(
 
fn start_send_message_with_ports(
 
    sending_port_id: PortId, sending_port_instruction: PortInstruction, value: ValueGroup,
 
    exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx,
 
    control: &mut ControlLayer
 
) -> Result<(), (PortInstruction, String)> {
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync); // busy in sync, trying to send
 

	
 
    // Retrieve ports we're going to transfer
 
    let sending_port_handle = comp_ctx.get_port_handle(sending_port_id);
 
    let sending_port_info = comp_ctx.get_port_mut(sending_port_handle);
 
    sending_port_info.last_instruction = sending_port_instruction;
 

	
 
    let mut transmit_ports = Vec::new();
 
    find_ports_in_value_group(&value, &mut transmit_ports);
 
    debug_assert!(!transmit_ports.is_empty()); // requisite for calling this function
 
    debug_assert!(!transmit_ports.is_empty()); // required from caller
 

	
 
    // Enter the state where we'll wait until all transferred ports are not
 
    // blocked.
 
    exec_state.set_as_blocked_put_with_ports(sending_port_id, value);
 

	
 
    if ports_not_blocked(comp_ctx, &transmit_ports) {
 
        // Ports are not blocked, so we can send them right away.
 
        perform_send_message_with_ports_notify_peers(
 
            exec_state, comp_ctx, sched_ctx, control, transmit_ports
 
        )?;
 
    } // else: wait until they become unblocked
 

	
 
    return Ok(())
 
}
 

	
 
fn perform_send_message_with_ports_notify_peers(
 
    exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx,
 
    control: &mut ControlLayer, transmit_ports: EncounteredPorts
 
) -> Result<(), (PortInstruction, String)> {
 
    // Check we're in the correct state in debug mode
 
    debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedTransferredPorts);
 
    debug_assert!(ports_not_blocked(comp_ctx, &transmit_ports));
 

	
 
    // Set up the final Ack that triggers us to send our final message
 
    let unblock_put_control_id = control.add_unblock_put_with_ports_entry();
 
@@ -1128,9 +1150,12 @@ fn prepare_send_message_with_ports(
 
        let peer_comp_id = transmit_port_info.peer_comp_id;
 
        let peer_port_id = transmit_port_info.peer_port_id;
 

	
 

	
 
        // Note: we checked earlier that we are currently in sync mode. Now we
 
        // will check if we've already used the port we're about to transmit.
 
        if !transmit_port_info.last_instruction.is_none() {
 
            let sending_port_handle = comp_ctx.get_port_handle(exec_state.mode_port);
 
            let sending_port_instruction = comp_ctx.get_port(sending_port_handle).last_instruction;
 
            return Err((
 
                sending_port_instruction,
 
                String::from("Cannot transmit one of the ports in this message, as it is used in this sync round")
 
@@ -1138,6 +1163,8 @@ fn prepare_send_message_with_ports(
 
        }
 

	
 
        if transmit_port_info.state.is_set(PortStateFlag::Transmitted) {
 
            let sending_port_handle = comp_ctx.get_port_handle(exec_state.mode_port);
 
            let sending_port_instruction = comp_ctx.get_port(sending_port_handle).last_instruction;
 
            return Err((
 
                sending_port_instruction,
 
                String::from("Cannot transmit one of the ports in this message, as that port is already transmitted")
 
@@ -1158,7 +1185,7 @@ fn prepare_send_message_with_ports(
 

	
 
    // We've set up the protocol, once all the PPC's are blocked we are supposed
 
    // to transfer the message to the recipient. So store it temporarily
 
    exec_state.set_as_blocked_put_with_ports(sending_port_id, value);
 
    exec_state.mode = CompMode::PutPortsBlockedAwaitingAcks;
 

	
 
    return Ok(());
 
}
 
@@ -1166,23 +1193,36 @@ fn prepare_send_message_with_ports(
 
/// Performs the transmission of a data message that contains ports. These were
 
/// all stored in the component's execution state by the
 
/// `prepare_send_message_with_ports` function. Port must be ready to send!
 
fn perform_send_message_with_ports(
 
fn perform_send_message_with_ports_to_receiver(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) {
 
    debug_assert_eq!(exec_state.mode, CompMode::BlockedPutPortsReady);
 
) -> Result<(), (PortInstruction, String)> {
 
    debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedSendingPort);
 

	
 
    // Find all ports again
 
    let mut transmit_ports = Vec::new();
 
    find_ports_in_value_group(&exec_state.mode_value, &mut transmit_ports);
 

	
 
    // Retrieve the port over which we're going to send the message
 
    let port_handle = comp_ctx.get_port_handle(exec_state.mode_port);
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert!(port_info.state.can_send() && !port_info.state.is_blocked());
 

	
 
    if !port_info.state.is_open() {
 
        return Err((
 
            port_info.last_instruction,
 
            String::from("cannot send over this port, as it is closed")
 
        ));
 
    }
 

	
 
    debug_assert!(!port_info.state.is_blocked_due_to_port_change()); // caller should have checked this
 
    let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 

	
 
    // Annotate the data message
 
    // Change state back to its default
 
    exec_state.mode = CompMode::Sync;
 
    let message_value = exec_state.mode_value.take();
 
    exec_state.mode_port = PortId::new_invalid();
 

	
 
    // Annotate the data message
 
    let mut annotated_message = consensus.annotate_data_message(comp_ctx, port_info, message_value);
 

	
 
    // And further enhance the message by adding data about the ports that are
 
@@ -1212,6 +1252,8 @@ fn perform_send_message_with_ports(
 
    // And finally, send the message to the peer
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 
    peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles an `Ack` for the control layer.
 
@@ -1219,7 +1261,7 @@ fn default_handle_ack(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer, control_id: ControlId,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) {
 
) -> Result<(), (PortInstruction, String)>{
 
    // Since an `Ack` may cause another one, handle them in a loop
 
    let mut to_ack = control_id;
 

	
 
@@ -1249,13 +1291,17 @@ fn default_handle_ack(
 
                // Send the message (containing ports) stored in the component
 
                // execution state to the recipient
 
                println!("DEBUG: Unblocking put with ports");
 
                exec_state.mode = CompMode::BlockedPutPortsReady;
 
                debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedAwaitingAcks);
 
                exec_state.mode = CompMode::PutPortsBlockedSendingPort;
 
                let port_handle = comp_ctx.get_port_handle(exec_state.mode_port);
 

	
 
                // Little bit of a hack, we didn't really unblock the sending
 
                // port, but this will mesh nicely with waiting for the sending
 
                // port to become unblocked.
 
                default_handle_recently_unblocked_port(
 
                    exec_state, control, consensus, port_handle, sched_ctx,
 
                    comp_ctx, inbox_main, inbox_backup
 
                );
 
                )?;
 
            },
 
            AckAction::None => {}
 
        }
 
@@ -1265,6 +1311,8 @@ fn default_handle_ack(
 
            None => break,
 
        }
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
/// Little helper for sending the most common kind of `Ack`
 
@@ -1291,21 +1339,17 @@ fn default_handle_recently_unblocked_port(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus,
 
    port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) {
 
) -> Result<(), (PortInstruction, String)> {
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    let port_id = port_info.self_id;
 

	
 
    if port_info.state.is_blocked() {
 
        // Port is still blocked. We wait until the next control message where
 
        // we unblock the port.
 
        return;
 
        return Ok(());
 
    }
 

	
 
    if exec_state.is_blocked_on_put_without_ports(port_id) {
 
        // Return to the regular execution mode
 
        exec_state.mode = CompMode::Sync;
 
        exec_state.mode_port = PortId::new_invalid();
 

	
 
        // Annotate the message that we're going to send
 
        let port_info = comp_ctx.get_port(port_handle); // for immutable access
 
        debug_assert_eq!(port_info.kind, PortKind::Putter);
 
@@ -1317,18 +1361,25 @@ fn default_handle_recently_unblocked_port(
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Data(to_send), true);
 

	
 
        exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state.
 
        exec_state.mode_port = PortId::new_invalid();
 
    } else if exec_state.is_blocked_on_put_with_ports(port_id) {
 
        // Port is not blocked, and we've completed our part of the
 
        // port-transfer protocol. So send the message
 
        perform_send_message_with_ports(
 
            exec_state, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup
 
        );
 

	
 
        // Return to the regular execution mode
 
        exec_state.mode = CompMode::Sync;
 
        exec_state.mode_port = PortId::new_invalid();
 
        debug_assert!(exec_state.mode_value.values.is_empty());
 
    } else if exec_state.mode == CompMode::PutPortsBlockedTransferredPorts {
 
        // We are waiting until all of the transferred ports become unblocked,
 
        // check so here.
 
        let mut transfer_ports = Vec::new();
 
        find_ports_in_value_group(&exec_state.mode_value, &mut transfer_ports);
 
        if ports_not_blocked(comp_ctx, &transfer_ports) {
 
            perform_send_message_with_ports_notify_peers(
 
                exec_state, comp_ctx, sched_ctx, control, transfer_ports
 
            )?;
 
        }
 
    } else if exec_state.mode == CompMode::PutPortsBlockedSendingPort && exec_state.mode_port == port_id {
 
        // We checked above that the port became unblocked, so we can send the
 
        // message
 
        perform_send_message_with_ports_to_receiver(
 
            exec_state, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup
 
        )?;
 
    } else if exec_state.is_blocked_on_create_component() {
 
        let mut ports = Vec::new();
 
        find_ports_in_value_group(&exec_state.mode_value, &mut ports);
 
@@ -1338,6 +1389,8 @@ fn default_handle_recently_unblocked_port(
 
            );
 
        }
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
#[inline]
src/runtime2/component/component_internet.rs
Show inline comments
 
@@ -126,8 +126,9 @@ impl Component for ComponentTcpClient {
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedSelect |
 
            CompMode::BlockedPutPortsAwaitingAcks |
 
            CompMode::BlockedPutPortsReady |
 
            CompMode::PutPortsBlockedTransferredPorts |
 
            CompMode::PutPortsBlockedAwaitingAcks |
 
            CompMode::PutPortsBlockedSendingPort |
 
            CompMode::NewComponentBlocked => {
 
                // Not possible: we never enter this state
 
                unreachable!();
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -285,8 +285,8 @@ impl Component for CompPDL {
 
                // continue and run PDL code
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedGet |
 
            CompMode::BlockedPut | CompMode::BlockedSelect |
 
            CompMode::BlockedPutPortsAwaitingAcks | CompMode::BlockedPutPortsReady |
 
            CompMode::BlockedPut | CompMode::BlockedSelect | CompMode::PutPortsBlockedTransferredPorts |
 
            CompMode::PutPortsBlockedAwaitingAcks | CompMode::PutPortsBlockedSendingPort |
 
            CompMode::NewComponentBlocked => {
 
                return CompScheduling::Sleep;
 
            }
src/runtime2/component/component_random.rs
Show inline comments
 
@@ -62,8 +62,11 @@ impl Component for ComponentRandomU32 {
 
        sched_ctx.info(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedGet | CompMode::BlockedSelect |
 
            CompMode::BlockedPutPortsAwaitingAcks | CompMode::BlockedPutPortsReady |
 
            CompMode::BlockedGet |
 
            CompMode::BlockedSelect |
 
            CompMode::PutPortsBlockedTransferredPorts |
 
            CompMode::PutPortsBlockedAwaitingAcks |
 
            CompMode::PutPortsBlockedSendingPort |
 
            CompMode::NewComponentBlocked => {
 
                // impossible for this component, no input ports and no select
 
                // blocks
0 comments (0 inline, 0 general)