Changeset - 03c2c3f96250
[Not reviewed]
0 4 0
mh - 3 years ago 2022-05-11 11:39:49
contact@maxhenger.nl
Fix bug involving interaction between transmitting ports and blocking
4 files changed with 81 insertions and 34 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -69,45 +69,48 @@ pub(crate) trait Component {
 

	
 
/// Representation of the generic operating mode of a component. Although not
 
/// every state may be used by every kind of (builtin) component, this allows
 
/// writing standard handlers for particular events in a component's lifetime.
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) enum CompMode {
 
    NonSync, // not in sync mode
 
    Sync, // in sync mode, can interact with other components
 
    SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block
 
    BlockedGet, // blocked because we need to receive a message on a particular port
 
    BlockedPut, // component is blocked because the port is blocked
 
    BlockedSelect, // waiting on message to complete the select statement
 
    BlockedPutPorts,// blocked because we're waiting to send a data message containing ports
 
    BlockedPutPortsAwaitingAcks,// blocked because we're waiting to send a data message containing ports, but first need to receive Acks for the PortPeerChanged messages
 
    BlockedPutPortsReady, // blocked because we're waitingto send a data message containing ports
 
    StartExit, // temporary state: if encountered then we start the shutdown process.
 
    BusyExit, // temporary state: waiting for Acks for all the closed ports, potentially waiting for sync round to finish
 
    Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0
 
}
 

	
 
impl CompMode {
 
    pub(crate) fn is_in_sync_block(&self) -> bool {
 
        use CompMode::*;
 

	
 
        match self {
 
            Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | BlockedPutPorts => true,
 
            Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect |
 
                BlockedPutPortsAwaitingAcks | BlockedPutPortsReady => true,
 
            NonSync | StartExit | BusyExit | Exit => false,
 
        }
 
    }
 

	
 
    pub(crate) fn is_busy_exiting(&self) -> bool {
 
        use CompMode::*;
 

	
 
        match self {
 
            NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | BlockedPutPorts => false,
 
            NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect |
 
                BlockedPutPortsAwaitingAcks | BlockedPutPortsReady => false,
 
            StartExit | BusyExit => true,
 
            Exit => false,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum ExitReason {
 
    Termination, // regular termination of component
 
    ErrorInSync,
 
    ErrorNonSync,
 
}
 
@@ -160,35 +163,47 @@ impl CompExecState {
 
    pub(crate) fn set_as_blocked_get(&mut self, port: PortId) {
 
        self.mode = CompMode::BlockedGet;
 
        self.mode_port = port;
 
        debug_assert!(self.mode_value.values.is_empty());
 
    }
 

	
 
    pub(crate) fn is_blocked_on_get(&self, port: PortId) -> bool {
 
        return
 
            self.mode == CompMode::BlockedGet &&
 
            self.mode_port == port;
 
    }
 

	
 
    pub(crate) fn set_as_blocked_put(&mut self, port: PortId, value: ValueGroup) {
 
    pub(crate) fn set_as_blocked_put_without_ports(&mut self, port: PortId, value: ValueGroup) {
 
        self.mode = CompMode::BlockedPut;
 
        self.mode_port = port;
 
        self.mode_value = value;
 
    }
 

	
 
    pub(crate) fn is_blocked_on_put(&self, port: PortId) -> bool {
 
    pub(crate) fn set_as_blocked_put_with_ports(&mut self, port: PortId, value: ValueGroup) {
 
        self.mode = CompMode::BlockedPutPortsAwaitingAcks;
 
        self.mode_port = port;
 
        self.mode_value = value;
 
    }
 

	
 
    pub(crate) fn is_blocked_on_put_without_ports(&self, port: PortId) -> bool {
 
        return
 
            self.mode == CompMode::BlockedPut &&
 
            self.mode_port == port;
 
    }
 

	
 
    pub(crate) fn is_blocked_on_put_with_ports(&self, port: PortId) -> bool {
 
        return
 
            self.mode == CompMode::BlockedPutPortsReady &&
 
            self.mode_port == port;
 
    }
 
}
 

	
 
// TODO: Replace when implementing port sending. Should probably be incorporated
 
//  into CompCtx (and rename CompCtx into CompComms)
 
pub(crate) type InboxMain = Vec<Option<DataMessage>>;
 
pub(crate) type InboxMainRef = [Option<DataMessage>];
 
pub(crate) type InboxBackup = Vec<DataMessage>;
 

	
 
/// Creates a new component based on its definition. Meaning that if it is a
 
/// user-defined component then we set up the PDL code state. Otherwise we
 
/// construct a custom component. This does NOT take care of port and message
 
/// management.
 
@@ -236,58 +251,56 @@ pub(crate) fn default_send_data_message(
 
    sched_ctx: &SchedulerCtx, consensus: &mut Consensus,
 
    control: &mut ControlLayer, comp_ctx: &mut CompCtx
 
) -> Result<CompScheduling, (PortInstruction, String)> {
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync);
 

	
 
    let port_handle = comp_ctx.get_port_handle(transmitting_port_id);
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = port_instruction;
 

	
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert_eq!(port_info.kind, PortKind::Putter);
 

	
 
    let mut ports = Vec::new();
 
    find_ports_in_value_group(&value, &mut ports);
 

	
 
    if port_info.state.is_closed() {
 
        // Note: normally peer is eventually consistent, but if it has shut down
 
        // then we can be sure it is consistent (I think?)
 
        return Err((
 
            port_info.last_instruction,
 
            format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0)
 
        ))
 
    } else if port_info.state.is_blocked() {
 
        // Port is blocked, so we cannot send
 
        exec_state.set_as_blocked_put(transmitting_port_id, value);
 

	
 
        return Ok(CompScheduling::Sleep);
 
    } else {
 
        // Check if there are any ports that are being transmitted
 
        let mut ports = Vec::new();
 
        find_ports_in_value_group(&value, &mut ports);
 
        if !ports.is_empty() {
 
    } else if !ports.is_empty() {
 
        prepare_send_message_with_ports(
 
            transmitting_port_id, port_instruction, value, exec_state,
 
            comp_ctx, sched_ctx, control
 
        )?;
 

	
 
        return Ok(CompScheduling::Sleep);
 
    } else if port_info.state.is_blocked() {
 
        // Port is blocked, so we cannot send
 
        exec_state.set_as_blocked_put_without_ports(transmitting_port_id, value);
 

	
 
        return Ok(CompScheduling::Sleep);
 
    } else {
 
        // Port is not blocked and no ports to transfer: send to the peer
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 

	
 
        return Ok(CompScheduling::Immediate);
 
    }
 
}
 
}
 

	
 
pub(crate) enum IncomingData {
 
    PlacedInSlot,
 
    SlotFull(DataMessage),
 
}
 

	
 
/// Default handling of receiving a data message. In case there is no room for
 
/// the message it is returned from this function. Note that this function is
 
/// different from PDL code performing a `get` on a port; this is the case where
 
/// the message first arrives at the component.
 
// NOTE: This is supposed to be a somewhat temporary implementation. It would be
 
//  nicest if the sending component can figure out it cannot send any more data.
 
@@ -579,25 +592,28 @@ pub(crate) fn default_handle_control_message(
 
            }
 
        },
 
        ControlMessageContent::UnblockPort => {
 
            // We were previously blocked (or already closed)
 
            let port_to_unblock = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_unblock);
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 

	
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers));
 

	
 
            port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers);
 
            default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
 
            default_handle_recently_unblocked_port(
 
                exec_state, consensus, port_handle, sched_ctx, comp_ctx,
 
                inbox_main, inbox_backup
 
            );
 
        },
 
        ControlMessageContent::PortPeerChangedBlock => {
 
            // The peer of our port has just changed. So we are asked to
 
            // temporarily block the port (while our original recipient is
 
            // potentially rerouting some of the in-flight messages) and
 
            // Ack. Then we wait for the `unblock` call.
 
            let port_to_change = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_change);
 

	
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            let peer_comp_id = port_info.peer_comp_id;
 
            port_info.state.set(PortStateFlag::BlockedDueToPeerChange);
 
@@ -607,25 +623,28 @@ pub(crate) fn default_handle_control_message(
 
        },
 
        ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => {
 
            let port_to_change = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_change);
 
            let port_info = comp_ctx.get_port(port_handle);
 
            debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToPeerChange));
 

	
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            port_info.peer_port_id = new_port_id;
 

	
 
            port_info.state.clear(PortStateFlag::BlockedDueToPeerChange);
 
            comp_ctx.change_port_peer(sched_ctx, port_handle, Some(new_comp_id));
 
            default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
 
            default_handle_recently_unblocked_port(
 
                exec_state, consensus, port_handle, sched_ctx, comp_ctx,
 
                inbox_main, inbox_backup
 
            );
 
        }
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles a component entering the synchronous block. Will ensure that the
 
/// `Consensus` and the `ComponentCtx` are initialized properly.
 
pub(crate) fn default_handle_sync_start(
 
    exec_state: &mut CompExecState, inbox_main: &mut InboxMainRef,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) {
 
@@ -867,46 +886,45 @@ fn prepare_send_message_with_ports(
 

	
 
        // Block the peer of the port
 
        let message = control.create_port_transfer_message(unblock_put_control_id, comp_ctx.id, peer_port_id);
 
        println!("DEBUG: Port transfer message\nControl ID: {:?}\nMessage: {:?}", unblock_put_control_id, message);
 
        let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
        peer_info.handle.send_message_logged(sched_ctx, message, true);
 
    }
 

	
 
    // We've set up the protocol, once all the PPC's are blocked we are supposed
 
    // to transfer the message to the recipient. So store it temporarily
 
    exec_state.mode = CompMode::BlockedPutPorts;
 
    exec_state.mode_port = sending_port_id;
 
    exec_state.mode_value = value;
 
    exec_state.set_as_blocked_put_with_ports(sending_port_id, value);
 

	
 
    return Ok(());
 
}
 

	
 
/// Performs the transmission of a data message that contains ports. These were
 
/// all stored in the component's execution state by the
 
/// `prepare_send_message_with_ports` function.
 
/// `prepare_send_message_with_ports` function. Port must be ready to send!
 
fn perform_send_message_with_ports(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, consensus: &mut Consensus,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) {
 
    debug_assert_eq!(exec_state.mode, CompMode::BlockedPutPorts);
 
    debug_assert_eq!(exec_state.mode, CompMode::BlockedPutPortsReady);
 

	
 
    // Find all ports again
 
    let mut transmit_ports = Vec::new();
 
    find_ports_in_value_group(&exec_state.mode_value, &mut transmit_ports);
 

	
 
    let port_handle = comp_ctx.get_port_handle(exec_state.mode_port);
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert!(port_info.state.can_send() && !port_info.state.is_blocked());
 
    let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 

	
 
    // Annotate the data message
 
    let message_value = exec_state.mode_value.take();
 
    let mut annotated_message = consensus.annotate_data_message(comp_ctx, port_info, message_value);
 

	
 
    // And further enhance the message by adding data about the ports that are
 
    // being transferred
 
    for (port_locations, transmit_port_id) in transmit_ports {
 
        let transmit_port_handle = comp_ctx.get_port_handle(transmit_port_id);
 
        let transmit_port_info = comp_ctx.get_port(transmit_port_handle);
 

	
 
@@ -954,29 +972,34 @@ fn default_handle_ack(
 
                // FIX @NoDirectHandle
 
                let mut handle = sched_ctx.runtime.get_component_public(to_schedule);
 

	
 
                // Note that the component is intentionally not
 
                // sleeping, so we just wake it up
 
                debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire));
 
                let key = unsafe { to_schedule.upgrade() };
 
                sched_ctx.runtime.enqueue_work(key);
 
                let _should_remove = handle.decrement_users();
 
                debug_assert!(_should_remove.is_none());
 
            },
 
            AckAction::UnblockPutWithPorts => {
 
                // Send the message (containing ports) to the recipient
 
                // Send the message (containing ports) stored in the component
 
                // execution state to the recipient
 
                println!("DEBUG: Unblocking put with ports");
 
                perform_send_message_with_ports(exec_state, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup);
 
                exec_state.mode = CompMode::Sync;
 
                exec_state.mode_port = PortId::new_invalid();
 
                exec_state.mode = CompMode::BlockedPutPortsReady;
 
                let port_handle = comp_ctx.get_port_handle(exec_state.mode_port);
 

	
 
                default_handle_recently_unblocked_port(
 
                    exec_state, consensus, port_handle, sched_ctx, comp_ctx,
 
                    inbox_main, inbox_backup
 
                );
 
            },
 
            AckAction::None => {}
 
        }
 

	
 
        match new_to_ack {
 
            Some(new_to_ack) => to_ack = new_to_ack,
 
            None => break,
 
        }
 
    }
 
}
 

	
 
/// Little helper for sending the most common kind of `Ack`
 
@@ -985,51 +1008,71 @@ fn default_send_ack(
 
    sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx
 
) {
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 
    peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{
 
        id: causer_of_ack_id,
 
        sender_comp_id: comp_ctx.id,
 
        target_port_id: None,
 
        content: ControlMessageContent::Ack
 
    }), true);
 
}
 

	
 
/// Handles the unblocking of a putter port. In case there is a pending message
 
/// on that port then it will be sent.
 
/// on that port then it will be sent. There are two reasons for calling this
 
/// function: either a port was blocked (i.e. the Blocked state flag was
 
/// cleared), or the component is ready to send a message containing ports
 
/// (stored in the execution state). In this latter case we might still have
 
/// a blocked port.
 
fn default_handle_recently_unblocked_port(
 
    exec_state: &mut CompExecState, consensus: &mut Consensus,
 
    port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) {
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    let port_id = port_info.self_id;
 
    debug_assert!(!port_info.state.is_blocked()); // should have been done by the caller
 

	
 
    if exec_state.is_blocked_on_put(port_id) {
 
    if port_info.state.is_blocked() {
 
        // Port is still blocked. We wait until the next control message where
 
        // we unblock the port.
 
        return;
 
    }
 

	
 
    if exec_state.is_blocked_on_put_without_ports(port_id) {
 
        // Return to the regular execution mode
 
        exec_state.mode = CompMode::Sync;
 
        exec_state.mode_port = PortId::new_invalid();
 

	
 
        // Annotate the message that we're going to send
 
        let port_info = comp_ctx.get_port(port_handle); // for immutable access
 
        debug_assert_eq!(port_info.kind, PortKind::Putter);
 
        let to_send = exec_state.mode_value.take();
 
        let to_send = consensus.annotate_data_message(comp_ctx, port_info, to_send);
 

	
 
        // Retrieve peer to send the message
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Data(to_send), true);
 

	
 
        exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state.
 
        exec_state.mode_port = PortId::new_invalid();
 
    } else if exec_state.is_blocked_on_put_with_ports(port_id) {
 
        // Port is not blocked, and we've completed our part of the
 
        // port-transfer protocol. So send the message
 
        perform_send_message_with_ports(
 
            exec_state, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup
 
        );
 

	
 
        exec_state.mode = CompMode::Sync;
 
        exec_state.mode_port = PortId::new_invalid();
 
        debug_assert!(exec_state.mode_value.values.is_empty());
 
    }
 
}
 

	
 
#[inline]
 
pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId {
 
    return PortId(port_id.id);
 
}
 

	
 
#[inline]
 
pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId {
 
    return EvalPortId{ id: port_id.0 };
 
}
src/runtime2/component/component_internet.rs
Show inline comments
 
@@ -116,25 +116,27 @@ impl Component for ComponentTcpClient {
 
                }
 
            },
 
            Message::Poll => {
 
                sched_ctx.info("Received polling event");
 
            },
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        sched_ctx.info(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedSelect | CompMode::BlockedPutPorts => {
 
            CompMode::BlockedSelect |
 
            CompMode::BlockedPutPortsAwaitingAcks |
 
            CompMode::BlockedPutPortsReady => {
 
                // Not possible: we never enter this state
 
                unreachable!();
 
            },
 
            CompMode::NonSync => {
 
                // When in non-sync mode
 
                match &mut self.socket_state {
 
                    SocketState::Connected(_socket) => {
 
                        if self.sync_state == SyncState::FinishSyncThenQuit {
 
                            // Previous request was to let the component shut down
 
                            self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                        } else {
 
                            // Reset for a new request
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -275,26 +275,27 @@ impl Component for CompPDL {
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        use EvalContinuation as EC;
 

	
 
        sched_ctx.info(&format!("Running component (mode: {:?})", self.exec_state.mode));
 

	
 
        // Depending on the mode don't do anything at all, take some special
 
        // actions, or fall through and run the PDL code.
 
        match self.exec_state.mode {
 
            CompMode::NonSync | CompMode::Sync => {
 
                // continue and run PDL code
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut |
 
            CompMode::BlockedSelect | CompMode::BlockedPutPorts => {
 
            CompMode::SyncEnd | CompMode::BlockedGet |
 
            CompMode::BlockedPut | CompMode::BlockedSelect |
 
            CompMode::BlockedPutPortsAwaitingAcks | CompMode::BlockedPutPortsReady => {
 
                return CompScheduling::Sleep;
 
            }
 
            CompMode::StartExit => return component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus
 
            ),
 
            CompMode::BusyExit => return component::default_handle_busy_exit(
 
                &mut self.exec_state, &self.control, sched_ctx
 
            ),
 
            CompMode::Exit => return component::default_handle_exit(&self.exec_state),
 
        }
 

	
 
        let run_result = self.execute_prompt(&sched_ctx);
src/runtime2/component/component_random.rs
Show inline comments
 
@@ -53,25 +53,26 @@ impl Component for ComponentRandomU32 {
 
                ) {
 
                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                }
 
            },
 
            Message::Poll => unreachable!(),
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        sched_ctx.info(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedGet | CompMode::BlockedSelect | CompMode::BlockedPutPorts => {
 
            CompMode::BlockedGet | CompMode::BlockedSelect |
 
            CompMode::BlockedPutPortsAwaitingAcks | CompMode::BlockedPutPortsReady => {
 
                // impossible for this component, no input ports and no select
 
                // blocks
 
                unreachable!();
 
            }
 
            CompMode::NonSync => {
 
                // If in non-sync mode then we check if the arguments make sense
 
                // (at some point in the future, this is just a testing
 
                // component).
 
                if self.random_minimum >= self.random_maximum {
 
                    // Could throw an evaluation error, but lets just panic
 
                    panic!("going to crash 'n burn your system now, please provide valid arguments");
 
                }
0 comments (0 inline, 0 general)