Changeset - ff87427e49f0
[Not reviewed]
0 2 0
MH - 3 years ago 2022-05-19 22:23:04
contact@maxhenger.nl
Remove unused field from port metadata
2 files changed with 1 insertions and 8 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -146,742 +146,739 @@ impl ExitReason {
 
        }
 
    }
 
}
 

	
 
/// Component execution state: the execution mode along with some descriptive
 
/// fields. Fields are public for ergonomic reasons, use member functions when
 
/// appropriate.
 
pub(crate) struct CompExecState {
 
    pub mode: CompMode,
 
    pub mode_port: PortId, // valid if blocked on a port (put/get)
 
    pub mode_value: ValueGroup, // valid if blocked on a put
 
    pub mode_component: (ProcedureDefinitionId, TypeId),
 
    pub exit_reason: ExitReason, // valid if in StartExit/BusyExit/Exit mode
 
}
 

	
 
impl CompExecState {
 
    pub(crate) fn new() -> Self {
 
        return Self{
 
            mode: CompMode::NonSync,
 
            mode_port: PortId::new_invalid(),
 
            mode_value: ValueGroup::default(),
 
            mode_component: (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()),
 
            exit_reason: ExitReason::Termination,
 
        }
 
    }
 

	
 
    pub(crate) fn set_as_start_exit(&mut self, reason: ExitReason) {
 
        self.mode = CompMode::StartExit;
 
        self.exit_reason = reason;
 
    }
 

	
 
    pub(crate) fn set_as_blocked_get(&mut self, port: PortId) {
 
        self.mode = CompMode::BlockedGet;
 
        self.mode_port = port;
 
        debug_assert!(self.mode_value.values.is_empty());
 
    }
 

	
 
    pub(crate) fn set_as_create_component_blocked(
 
        &mut self, proc_id: ProcedureDefinitionId, type_id: TypeId,
 
        arguments: ValueGroup
 
    ) {
 
        self.mode = CompMode::NewComponentBlocked;
 
        self.mode_value = arguments;
 
        self.mode_component = (proc_id, type_id);
 
    }
 

	
 
    pub(crate) fn is_blocked_on_get(&self, port: PortId) -> bool {
 
        return
 
            self.mode == CompMode::BlockedGet &&
 
            self.mode_port == port;
 
    }
 

	
 
    pub(crate) fn set_as_blocked_put_without_ports(&mut self, port: PortId, value: ValueGroup) {
 
        self.mode = CompMode::BlockedPut;
 
        self.mode_port = port;
 
        self.mode_value = value;
 
    }
 

	
 
    pub(crate) fn set_as_blocked_put_with_ports(&mut self, port: PortId, value: ValueGroup) {
 
        self.mode = CompMode::PutPortsBlockedTransferredPorts;
 
        self.mode_port = port;
 
        self.mode_value = value;
 
    }
 

	
 
    pub(crate) fn is_blocked_on_put_without_ports(&self, port: PortId) -> bool {
 
        return
 
            self.mode == CompMode::BlockedPut &&
 
            self.mode_port == port;
 
    }
 

	
 
    pub(crate) fn is_blocked_on_create_component(&self) -> bool {
 
        return self.mode == CompMode::NewComponentBlocked;
 
    }
 
}
 

	
 
// TODO: Replace when implementing port sending. Should probably be incorporated
 
//  into CompCtx (and rename CompCtx into CompComms)
 
pub(crate) type InboxMain = Vec<Option<DataMessage>>;
 
pub(crate) type InboxMainRef = [Option<DataMessage>];
 
pub(crate) type InboxBackup = Vec<DataMessage>;
 

	
 
/// Creates a new component based on its definition. Meaning that if it is a
 
/// user-defined component then we set up the PDL code state. Otherwise we
 
/// construct a custom component. This does NOT take care of port and message
 
/// management.
 
pub(crate) fn create_component(
 
    protocol: &ProtocolDescription,
 
    definition_id: ProcedureDefinitionId, type_id: TypeId,
 
    arguments: ValueGroup, num_ports: usize
 
) -> Box<dyn Component> {
 
    let definition = &protocol.heap[definition_id];
 
    debug_assert_eq!(definition.kind, ProcedureKind::Component);
 

	
 
    if definition.source.is_builtin() {
 
        // Builtin component
 
        let component: Box<dyn Component> = match definition.source {
 
            ProcedureSource::CompRandomU32 => Box::new(ComponentRandomU32::new(arguments)),
 
            ProcedureSource::CompTcpClient => Box::new(ComponentTcpClient::new(arguments)),
 
            ProcedureSource::CompTcpListener => Box::new(ComponentTcpListener::new(arguments)),
 
            _ => unreachable!(),
 
        };
 

	
 
        return component;
 
    } else {
 
        // User-defined component
 
        let prompt = Prompt::new(
 
            &protocol.types, &protocol.heap,
 
            definition_id, type_id, arguments
 
        );
 
        let component = CompPDL::new(prompt, num_ports);
 
        return Box::new(component);
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Generic component messaging utilities (for sending and receiving)
 
// -----------------------------------------------------------------------------
 

	
 
/// Default handling of sending a data message. In case the port is blocked then
 
/// the `ExecState` will become blocked as well. Note that
 
/// `default_handle_control_message` will ensure that the port becomes
 
/// unblocked if so instructed by the receiving component. The returned
 
/// scheduling value must be used.
 
#[must_use]
 
pub(crate) fn default_send_data_message(
 
    exec_state: &mut CompExecState, transmitting_port_id: PortId,
 
    port_instruction: PortInstruction, value: ValueGroup,
 
    sched_ctx: &SchedulerCtx, consensus: &mut Consensus,
 
    control: &mut ControlLayer, comp_ctx: &mut CompCtx
 
) -> Result<CompScheduling, (PortInstruction, String)> {
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync);
 

	
 
    let port_handle = comp_ctx.get_port_handle(transmitting_port_id);
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = port_instruction;
 
    port_info.last_registered_round = Some(consensus.round_number());
 

	
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert_eq!(port_info.kind, PortKind::Putter);
 

	
 
    let mut ports = Vec::new();
 
    find_ports_in_value_group(&value, &mut ports);
 

	
 
    if port_info.state.is_closed() {
 
        // Note: normally peer is eventually consistent, but if it has shut down
 
        // then we can be sure it is consistent (I think?)
 
        return Err((
 
            port_info.last_instruction,
 
            format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0)
 
        ))
 
    } else if !ports.is_empty() {
 
        start_send_message_with_ports(
 
            transmitting_port_id, port_instruction, value, exec_state,
 
            comp_ctx, sched_ctx, control
 
        )?;
 

	
 
        return Ok(CompScheduling::Sleep);
 
    } else if port_info.state.is_blocked() {
 
        // Port is blocked, so we cannot send
 
        exec_state.set_as_blocked_put_without_ports(transmitting_port_id, value);
 

	
 
        return Ok(CompScheduling::Sleep);
 
    } else {
 
        // Port is not blocked and no ports to transfer: send to the peer
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 

	
 
        return Ok(CompScheduling::Immediate);
 
    }
 
}
 

	
 
pub(crate) enum IncomingData {
 
    PlacedInSlot,
 
    SlotFull(DataMessage),
 
}
 

	
 
/// Default handling of receiving a data message. In case there is no room for
 
/// the message it is returned from this function. Note that this function is
 
/// different from PDL code performing a `get` on a port; this is the case where
 
/// the message first arrives at the component.
 
// NOTE: This is supposed to be a somewhat temporary implementation. It would be
 
//  nicest if the sending component can figure out it cannot send any more data.
 
#[must_use]
 
pub(crate) fn default_handle_incoming_data_message(
 
    exec_state: &mut CompExecState, inbox_main: &mut InboxMain,
 
    comp_ctx: &mut CompCtx, incoming_message: DataMessage,
 
    sched_ctx: &SchedulerCtx, control: &mut ControlLayer
 
) -> IncomingData {
 
    let port_handle = comp_ctx.get_port_handle(incoming_message.data_header.target_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 
    comp_ctx.get_port_mut(port_handle).received_message_for_sync = true;
 
    let port_value_slot = &mut inbox_main[port_index];
 
    let target_port_id = incoming_message.data_header.target_port;
 

	
 
    if port_value_slot.is_none() {
 
        // We can put the value in the slot
 
        *port_value_slot = Some(incoming_message);
 

	
 
        // Check if we're blocked on receiving this message.
 
        dbg_code!({
 
            // Our port cannot have been blocked itself, because we're able to
 
            // directly insert the message into its slot.
 
            assert!(!comp_ctx.get_port(port_handle).state.is_blocked());
 
        });
 

	
 
        if exec_state.is_blocked_on_get(target_port_id) {
 
            // Return to normal operation
 
            exec_state.mode = CompMode::Sync;
 
            exec_state.mode_port = PortId::new_invalid();
 
            debug_assert!(exec_state.mode_value.values.is_empty());
 
        }
 

	
 
        return IncomingData::PlacedInSlot
 
    } else {
 
        // Slot is already full, so if the port was previously opened, it will
 
        // now become closed
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        if port_info.state.is_open() {
 
            port_info.state.set(PortStateFlag::BlockedDueToFullBuffers);
 

	
 
            let (peer_handle, message) =
 
                control.initiate_port_blocking(comp_ctx, port_handle);
 
            let peer = comp_ctx.get_peer(peer_handle);
 
            peer.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
        }
 

	
 
        return IncomingData::SlotFull(incoming_message)
 
    }
 
}
 

	
 
pub(crate) enum GetResult {
 
    Received(DataMessage),
 
    NoMessage,
 
    Error((PortInstruction, String)),
 
}
 

	
 
/// Default attempt at trying to receive from a port (i.e. through a `get`, or
 
/// the equivalent operation for a builtin component). `target_port` is the port
 
/// we're trying to receive from, and the `target_port_instruction` is the
 
/// instruction we're attempting on this port.
 
pub(crate) fn default_attempt_get(
 
    exec_state: &mut CompExecState, target_port: PortId, target_port_instruction: PortInstruction,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx,
 
    comp_ctx: &mut CompCtx, control: &mut ControlLayer, consensus: &mut Consensus
 
) -> GetResult {
 
    let port_handle = comp_ctx.get_port_handle(target_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 

	
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = target_port_instruction;
 
    if port_info.state.is_closed() {
 
        let peer_id = port_info.peer_comp_id;
 
        return GetResult::Error((
 
            target_port_instruction,
 
            format!("Cannot get from this port, as the peer component (id:{}) closed the port", peer_id.0)
 
        ));
 
    }
 

	
 
    if let Some(message) = &inbox_main[port_index] {
 
        if consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
            // We're allowed to receive this message
 
            let mut message = inbox_main[port_index].take().unwrap();
 
            debug_assert_eq!(target_port, message.data_header.target_port);
 

	
 
            // Note: we can still run into an unrecoverable error when actually
 
            // receiving this message
 
            match default_handle_received_data_message(
 
                target_port, target_port_instruction,
 
                &mut message, inbox_main, inbox_backup,
 
                comp_ctx, sched_ctx, control, consensus
 
            ) {
 
                Ok(()) => return GetResult::Received(message),
 
                Err(location_and_message) => return GetResult::Error(location_and_message)
 
            }
 
        } else {
 
            // We're not allowed to receive this message. This means that the
 
            // receiver is attempting to receive something out of order with
 
            // respect to the sender.
 
            return GetResult::Error((target_port_instruction, String::from(
 
                "Cannot get from this port, as this causes a deadlock. This happens if you `get` in a different order as another component `put`s"
 
            )));
 
        }
 
    } else {
 
        // We don't have a message waiting for us and the port is not blocked.
 
        // So enter the BlockedGet state
 
        exec_state.set_as_blocked_get(target_port);
 
        return GetResult::NoMessage;
 
    }
 
}
 

	
 
/// Default handling that has been received through a `get`. Will check if any
 
/// more messages are waiting, and if the corresponding port was blocked because
 
/// of full buffers (hence, will use the control layer to make sure the peer
 
/// will become unblocked).
 
pub(crate) fn default_handle_received_data_message(
 
    targeted_port: PortId, _port_instruction: PortInstruction, message: &mut DataMessage,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup,
 
    comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer,
 
    consensus: &mut Consensus
 
) -> Result<(), (PortInstruction, String)> {
 
    let port_handle = comp_ctx.get_port_handle(targeted_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 
    debug_assert!(inbox_main[port_index].is_none()); // because we've just received from it
 

	
 
    // If we received any ports, add them to the port tracking and inbox struct.
 
    // Then notify the peers that they can continue sending to this port, but
 
    // now at a new address.
 
    for received_port in &mut message.ports {
 
        // Create a new port locally
 
        let mut new_port_state = received_port.state;
 
        new_port_state.set(PortStateFlag::Received);
 
        let new_port_handle = comp_ctx.add_port(
 
            received_port.peer_comp, received_port.peer_port,
 
            received_port.kind, new_port_state
 
        );
 
        comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(received_port.peer_comp));
 
        let new_port = comp_ctx.get_port(new_port_handle);
 

	
 
        // Transfer messages to main/backup inbox. Make sure to modify the
 
        // target port to our own
 
        for message in received_port.messages.iter_mut() {
 
            message.data_header.target_port = new_port.self_id;
 
        }
 

	
 
        let _new_inbox_index = inbox_main.len();
 
        if !received_port.messages.is_empty() {
 
            inbox_main.push(Some(received_port.messages.remove(0)));
 
            inbox_backup.extend(received_port.messages.drain(..));
 
        } else {
 
            inbox_main.push(None);
 
        }
 

	
 
        debug_assert_eq!(_new_inbox_index, comp_ctx.get_port_index(new_port_handle));
 

	
 
        // Add the port tho the consensus
 
        consensus.notify_of_new_port(_new_inbox_index, new_port_handle, comp_ctx);
 

	
 
        // Replace all references to the port in the received message
 
        for message_location in received_port.locations.iter().copied() {
 
            let value = message.content.get_value_mut(message_location);
 

	
 
            match value {
 
                Value::Input(_) => {
 
                    debug_assert_eq!(new_port.kind, PortKind::Getter);
 
                    *value = Value::Input(port_id_to_eval(new_port.self_id));
 
                },
 
                Value::Output(_) => {
 
                    debug_assert_eq!(new_port.kind, PortKind::Putter);
 
                    *value = Value::Output(port_id_to_eval(new_port.self_id));
 
                },
 
                _ => unreachable!(),
 
            }
 
        }
 

	
 
        // Let the peer know that the port can now be used
 
        let peer_handle = comp_ctx.get_peer_handle(new_port.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{
 
            id: ControlId::new_invalid(),
 
            sender_comp_id: comp_ctx.id,
 
            target_port_id: Some(new_port.peer_port_id),
 
            content: ControlMessageContent::PortPeerChangedUnblock(new_port.self_id, comp_ctx.id)
 
        }), true);
 
    }
 

	
 
    // Modify last-known location where port instruction was retrieved
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller
 
    debug_assert!(port_info.state.is_open()); // checked by caller
 
    port_info.last_registered_round = Some(message.sync_header.sync_round);
 

	
 
    // Check if there are any more messages in the backup buffer
 
    for message_index in 0..inbox_backup.len() {
 
        let message = &inbox_backup[message_index];
 
        if message.data_header.target_port == targeted_port {
 
            // One more message, place it in the slot
 
            let message = inbox_backup.remove(message_index);
 
            debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we're removing another message from the backup
 
            inbox_main[port_index] = Some(message);
 

	
 
            return Ok(());
 
        }
 
    }
 

	
 
    // Did not have any more messages, so if we were blocked, then we need to
 
    // unblock the port now (and inform the peer of this unblocking)
 
    if port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers) {
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers);
 

	
 
        let (peer_handle, message) = control.cancel_port_blocking(comp_ctx, port_handle);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles control messages in the default way. Note that this function may
 
/// take a lot of actions in the name of the caller: pending messages may be
 
/// sent, ports may become blocked/unblocked, etc. So the execution
 
/// (`CompExecState`), control (`ControlLayer`) and consensus (`Consensus`)
 
/// state may all change.
 
pub(crate) fn default_handle_control_message(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus,
 
    message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) -> Result<(), (PortInstruction, String)> {
 
    match message.content {
 
        ControlMessageContent::Ack => {
 
            default_handle_ack(exec_state, control, message.id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup)?;
 
        },
 
        ControlMessageContent::BlockPort => {
 
            // One of our messages was accepted, but the port should be
 
            // blocked.
 
            let port_to_block = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_block);
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            port_info.state.set(PortStateFlag::BlockedDueToFullBuffers);
 
        },
 
        ControlMessageContent::ClosePort(content) => {
 
            // Request to close the port. We immediately comply and remove
 
            // the component handle as well
 
            let port_to_close = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_close);
 

	
 
            // We're closing the port, so we will always update the peer of the
 
            // port (in case of error messages)
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            port_info.peer_comp_id = message.sender_comp_id;
 
            port_info.close_at_sync_end = true; // might be redundant (we might set it closed now)
 

	
 
            let peer_comp_id = port_info.peer_comp_id;
 
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
            // One exception to sending an `Ack` is if we just closed the
 
            // port ourselves, meaning that the `ClosePort` messages got
 
            // sent to one another.
 
            if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) {
 
                // The two components (sender and this component) are closing
 
                // the channel at the same time. So we don't care about the
 
                // content of the `ClosePort` message.
 
                default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup)?;
 
            } else {
 
                // Respond to the message
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let last_registered_round = port_info.last_registered_round;
 
                let last_instruction = port_info.last_instruction;
 
                let port_has_had_message = port_info.received_message_for_sync;
 
                default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
                comp_ctx.change_port_peer(sched_ctx, port_handle, None);
 

	
 
                // Handle any possible error conditions (which boil down to: the
 
                // port has been used, but the peer has died). If not in sync
 
                // mode then we close the port immediately.
 

	
 
                // Note that `port_was_used` does not mean that any messages
 
                // were actually received. It might also mean that e.g. the
 
                // component attempted a `get`, but there were no messages, so
 
                // now it is in the `BlockedGet` state.
 
                let port_was_used = last_instruction != PortInstruction::None;
 

	
 
                if exec_state.mode.is_in_sync_block() {
 
                    let round_has_succeeded = !content.closed_in_sync_round && last_registered_round == content.registered_round;
 
                    let closed_during_sync_round = content.closed_in_sync_round;
 
                    let closed_before_sync_round = ! closed_during_sync_round && !round_has_succeeded;
 
                    let closed_before_sync_round = !closed_during_sync_round && !round_has_succeeded;
 

	
 
                    if (closed_during_sync_round || closed_before_sync_round) && port_was_used {
 
                        return Err((
 
                            last_instruction,
 
                            format!("Peer component (id:{}) shut down, so communication cannot (have) succeed(ed)", peer_comp_id.0)
 
                        ));
 
                    }
 
                } else {
 
                    let port_info = comp_ctx.get_port_mut(port_handle);
 
                    port_info.state.set(PortStateFlag::Closed);
 
                }
 
            }
 
        },
 
        ControlMessageContent::UnblockPort => {
 
            // We were previously blocked (or already closed)
 
            let port_to_unblock = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_unblock);
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 

	
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers));
 

	
 
            port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers);
 
            default_handle_recently_unblocked_port(
 
                exec_state, control, consensus, port_handle, sched_ctx,
 
                comp_ctx, inbox_main, inbox_backup
 
            )?;
 
        },
 
        ControlMessageContent::PortPeerChangedBlock => {
 
            // The peer of our port has just changed. So we are asked to
 
            // temporarily block the port (while our original recipient is
 
            // potentially rerouting some of the in-flight messages) and
 
            // Ack. Then we wait for the `unblock` call.
 
            let port_to_change = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_change);
 

	
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            let peer_comp_id = port_info.peer_comp_id;
 
            port_info.state.set(PortStateFlag::BlockedDueToPeerChange);
 
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
            default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
        },
 
        ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => {
 
            let port_to_change = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_change);
 
            let port_info = comp_ctx.get_port(port_handle);
 
            debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToPeerChange));
 

	
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            port_info.peer_port_id = new_port_id;
 

	
 
            port_info.state.clear(PortStateFlag::BlockedDueToPeerChange);
 
            comp_ctx.change_port_peer(sched_ctx, port_handle, Some(new_comp_id));
 
            default_handle_recently_unblocked_port(
 
                exec_state, control, consensus, port_handle, sched_ctx,
 
                comp_ctx, inbox_main, inbox_backup
 
            )?;
 
        }
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles a component entering the synchronous block. Will ensure that the
 
/// `Consensus` and the `ComponentCtx` are initialized properly.
 
pub(crate) fn default_handle_sync_start(
 
    exec_state: &mut CompExecState, inbox_main: &mut InboxMainRef,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) {
 
    sched_ctx.info("Component starting sync mode");
 

	
 
    // If any messages are present for this sync round, set the appropriate flag
 
    // and notify the consensus handler of the present messages
 
    consensus.notify_sync_start(comp_ctx);
 
    for (port_index, message) in inbox_main.iter().enumerate() {
 
        if let Some(message) = message {
 
            consensus.handle_incoming_data_message(comp_ctx, message);
 
            let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
            port_info.received_message_for_sync = true;
 
        }
 
    }
 

	
 
    // Modify execution state
 
    debug_assert_eq!(exec_state.mode, CompMode::NonSync);
 
    exec_state.mode = CompMode::Sync;
 
}
 

	
 
/// Handles a component that has reached the end of the sync block. This does
 
/// not necessarily mean that the component will go into the `NonSync` mode, as
 
/// it might have to wait for the leader to finish the round for everyone (see
 
/// `default_handle_sync_decision`)
 
pub(crate) fn default_handle_sync_end(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    consensus: &mut Consensus
 
) {
 
    sched_ctx.info("Component ending sync mode (but possibly waiting for a solution)");
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync);
 
    let decision = consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
    exec_state.mode = CompMode::SyncEnd;
 
    default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus);
 
}
 

	
 
/// Handles a component initiating the exiting procedure, and closing all of its
 
/// ports. Should only be called once per component (which is ensured by
 
/// checking and modifying the mode in the execution state).
 
#[must_use]
 
pub(crate) fn default_handle_start_exit(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::StartExit);
 
    for port_index in 0..comp_ctx.num_ports() {
 
        let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
        if port_info.state.is_blocked() {
 
            return CompScheduling::Sleep;
 
        }
 
    }
 

	
 
    sched_ctx.info(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason));
 
    exec_state.mode = CompMode::BusyExit;
 
    let exit_inside_sync = exec_state.exit_reason.is_in_sync();
 

	
 
    // If exiting while inside sync mode, report to the leader of the current
 
    // round that we've failed.
 
    if exit_inside_sync {
 
        let decision = consensus.notify_sync_end_failure(sched_ctx, comp_ctx);
 
        default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus);
 
    }
 

	
 
    // Iterating over ports by index to work around borrowing rules
 
    for port_index in 0..comp_ctx.num_ports() {
 
        let port = comp_ctx.get_port_by_index_mut(port_index);
 
        if port.state.is_closed() || port.state.is_set(PortStateFlag::Transmitted) || port.close_at_sync_end {
 
            // Already closed, or in the process of being closed
 
            continue;
 
        }
 

	
 
        // Mark as closed
 
        let port_id = port.self_id;
 
        port.state.set(PortStateFlag::Closed);
 

	
 
        // Notify peer of closing
 
        let port_handle = comp_ctx.get_port_handle(port_id);
 
        let (peer, message) = control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx);
 
        let peer_info = comp_ctx.get_peer(peer);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
    }
 

	
 
    return CompScheduling::Immediate; // to check if we can shut down immediately
 
}
 

	
 
/// Handles a component waiting until all peers are notified that it is quitting
 
/// (i.e. after calling `default_handle_start_exit`).
 
#[must_use]
 
pub(crate) fn default_handle_busy_exit(
 
    exec_state: &mut CompExecState, control: &ControlLayer,
 
    sched_ctx: &SchedulerCtx
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::BusyExit);
 
    if control.has_acks_remaining() {
 
        sched_ctx.info("Component busy exiting, still has `Ack`s remaining");
 
        return CompScheduling::Sleep;
 
    } else {
 
        sched_ctx.info("Component busy exiting, now shutting down");
 
        exec_state.mode = CompMode::Exit;
 
        return CompScheduling::Exit;
 
    }
 
}
 

	
 
/// Handles a potential synchronous round decision. If there was a decision then
 
/// the `Some(success)` value indicates whether the round succeeded or not.
 
/// Might also end up changing the `ExecState`.
 
///
 
/// Might be called in two cases:
 
/// 1. The component is in regular execution mode, at the end of a sync round,
 
///     and is waiting for a solution to the round.
 
/// 2. The component has encountered an error during a sync round and is
 
///     exiting, hence is waiting for a "Failure" message from the leader.
 
pub(crate) fn default_handle_sync_decision(
 
    sched_ctx: &SchedulerCtx, exec_state: &mut CompExecState, comp_ctx: &mut CompCtx,
 
    decision: SyncRoundDecision, consensus: &mut Consensus
 
) -> Option<bool> {
 
    let success = match decision {
 
        SyncRoundDecision::None => return None,
 
        SyncRoundDecision::Solution => true,
 
        SyncRoundDecision::Failure => false,
 
    };
 

	
 
    debug_assert!(
 
        exec_state.mode == CompMode::SyncEnd || (
 
            exec_state.mode.is_busy_exiting() && exec_state.exit_reason.is_error()
 
        ) || (
 
            exec_state.mode.is_in_sync_block() && decision == SyncRoundDecision::Failure
 
        )
 
    );
 

	
 
    sched_ctx.info(&format!("Handling decision {:?} (in mode: {:?})", decision, exec_state.mode));
 
    consensus.notify_sync_decision(decision);
 
    if success {
 
        // We cannot get a success message if the component has encountered an
 
        // error.
 
        for port_index in 0..comp_ctx.num_ports() {
 
            let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
            if port_info.close_at_sync_end {
 
                port_info.state.set(PortStateFlag::Closed);
 
            }
 
            port_info.state.clear(PortStateFlag::Received);
 
        }
 
        debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
 
        exec_state.mode = CompMode::NonSync;
 
        return Some(true);
 
    } else {
 
        // We may get failure both in all possible cases. But we should only
 
        // modify the execution state if we're not already in exit mode
 
        if !exec_state.mode.is_busy_exiting() {
 
            sched_ctx.error("failed synchronous round, initiating exit");
 
            exec_state.set_as_start_exit(ExitReason::ErrorNonSync);
 
        }
 
        return Some(false);
 
    }
 
}
 

	
 
/// Special component creation function. This function assumes that the
 
/// transferred ports are NOT blocked, and that the channels to whom the ports
 
/// belong are fully owned by the creating component. This will be checked in
 
/// debug mode.
 
pub(crate) fn special_create_component(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, instantiator_ctx: &mut CompCtx,
 
    control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup,
 
    component_instance: Box<dyn Component>, component_ports: Vec<PortId>,
 
) {
 
    debug_assert_eq!(exec_state.mode, CompMode::NonSync);
 
    let reservation = sched_ctx.runtime.start_create_component();
 
    let mut created_ctx = CompCtx::new(&reservation);
 
    let mut port_pairs = Vec::new();
 

	
 
    // Retrieve ports
 
    for instantiator_port_id in component_ports.iter() {
 
        let instantiator_port_id = *instantiator_port_id;
 
        let instantiator_port_handle = instantiator_ctx.get_port_handle(instantiator_port_id);
 
        let instantiator_port = instantiator_ctx.get_port(instantiator_port_handle);
 

	
 
        // Check if conditions for calling this function are valid
 
        debug_assert!(!instantiator_port.state.is_blocked_due_to_port_change());
 
        debug_assert_eq!(instantiator_port.peer_comp_id, instantiator_ctx.id);
 

	
 
        // Create port at new component
 
        let created_port_handle = created_ctx.add_port(
 
            instantiator_port.peer_comp_id, instantiator_port.peer_port_id,
 
            instantiator_port.kind, instantiator_port.state
 
        );
 
        let created_port = created_ctx.get_port(created_port_handle);
 
        let created_port_id = created_port.self_id;
 

	
 
        // Store in port pairs
 
        let is_open = instantiator_port.state.is_open();
 
        port_pairs.push(PortPair{
 
            instantiator_id: instantiator_port_id,
 
            instantiator_handle: instantiator_port_handle,
 
            created_id: created_port_id,
 
            created_handle: created_port_handle,
 
            is_open,
 
        });
 
    }
 

	
 
    // Set peer of the port for the new component
 
    for pair in port_pairs.iter() {
 
        let instantiator_port_info = instantiator_ctx.get_port(pair.instantiator_handle);
 
        let created_port_info = created_ctx.get_port_mut(pair.created_handle);
 

	
 
        // Note: we checked above (in debug mode) that the peer of the port is
src/runtime2/component/component_context.rs
Show inline comments
 
use std::fmt::{Debug, Formatter, Result as FmtResult};
 

	
 
use crate::runtime2::scheduler::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 

	
 
use crate::protocol::ExpressionId;
 

	
 
/// Helper struct to remember when the last operation on the port took place.
 
#[derive(Debug, PartialEq, Copy, Clone)]
 
pub enum PortInstruction {
 
    None,
 
    NoSource,
 
    SourceLocation(ExpressionId),
 
}
 

	
 
impl PortInstruction {
 
    pub fn is_none(&self) -> bool {
 
        match self {
 
            PortInstruction::None => return true,
 
            _ => return false,
 
        }
 
    }
 
}
 

	
 
/// Directionality of a port
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
/// Bitflags for port
 
// TODO: Incorporate remaining flags from `Port` struct
 
#[repr(u32)]
 
#[derive(Debug, Copy, Clone)]
 
pub enum PortStateFlag {
 
    Closed = 0x01, // If not closed, then the port is open
 
    BlockedDueToPeerChange = 0x02, // busy changing peers, hence use of port is temporarily blocked
 
    BlockedDueToFullBuffers = 0x04,
 
    Transmitted = 0x08, // Transmitted, so cannot be used anymore
 
    Received = 0x10, // Received, so cannot be used yet, only after the sync round
 
}
 

	
 
#[derive(Copy, Clone)]
 
pub struct PortState {
 
    flags: u32
 
}
 

	
 
impl PortState {
 
    pub(crate) fn new() -> PortState {
 
        return PortState{ flags: 0 }
 
    }
 

	
 
    // high-level
 

	
 
    #[inline]
 
    pub fn is_open(&self) -> bool {
 
        return !self.is_closed();
 
    }
 

	
 
    #[inline]
 
    pub fn can_send(&self) -> bool {
 
        return
 
            !self.is_set(PortStateFlag::Closed) &&
 
            !self.is_set(PortStateFlag::Transmitted) &&
 
            !self.is_set(PortStateFlag::Received);
 
    }
 

	
 
    #[inline]
 
    pub fn is_closed(&self) -> bool {
 
        return self.is_set(PortStateFlag::Closed);
 
    }
 

	
 
    #[inline]
 
    pub fn is_blocked(&self) -> bool {
 
        return
 
            self.is_set(PortStateFlag::BlockedDueToPeerChange) ||
 
            self.is_set(PortStateFlag::BlockedDueToFullBuffers);
 
    }
 

	
 
    #[inline]
 
    pub fn is_blocked_due_to_port_change(&self) -> bool {
 
        return self.is_set(PortStateFlag::BlockedDueToPeerChange);
 
    }
 

	
 
    // lower-level utils
 
    #[inline]
 
    pub fn set(&mut self, flag: PortStateFlag) {
 
        self.flags |= flag as u32;
 
    }
 

	
 
    #[inline]
 
    pub fn clear(&mut self, flag: PortStateFlag) {
 
        self.flags &= !(flag as u32);
 
    }
 

	
 
    #[inline]
 
    pub const fn is_set(&self, flag: PortStateFlag) -> bool {
 
        return (self.flags & (flag as u32)) != 0;
 
    }
 
}
 

	
 
impl Debug for PortState {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
 
        use PortStateFlag::*;
 

	
 
        let mut s = f.debug_struct("PortState");
 
        for (flag_name, flag_value) in &[
 
            ("closed", Closed),
 
            ("blocked_peer_change", BlockedDueToPeerChange),
 
            ("blocked_full_buffers", BlockedDueToFullBuffers),
 
            ("transmitted", Transmitted),
 
        ] {
 
            s.field(flag_name, &self.is_set(*flag_value));
 
        }
 

	
 
        return s.finish();
 
    }
 
}
 

	
 
#[derive(Debug)]
 
pub struct Port {
 
    // Identifiers
 
    pub self_id: PortId,
 
    pub peer_comp_id: CompId, // eventually consistent
 
    pub peer_port_id: PortId, // eventually consistent
 
    // Generic operating state
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    // State tracking for error detection and error handling
 
    pub last_registered_round: Option<u32>,
 
    pub last_instruction: PortInstruction, // used during sync round to detect port-closed-during-sync errors
 
    pub received_message_for_sync: bool, // used during sync round to detect port-closed-before-sync errors
 
    pub close_at_sync_end: bool, // set during sync round when receiving a port-closed-after-sync message
 
    pub(crate) associated_with_peer: bool,
 
}
 

	
 
pub struct Peer {
 
    pub id: CompId,
 
    pub num_associated_ports: u32,
 
    pub(crate) handle: CompHandle,
 
}
 

	
 
/// Port and peer management structure. Will keep a local reference counter to
 
/// the ports associate with peers, additionally manages the atomic reference
 
/// counter associated with the peers' component handles.
 
pub struct CompCtx {
 
    pub id: CompId,
 
    ports: Vec<Port>,
 
    peers: Vec<Peer>,
 
    port_id_counter: u32,
 
}
 

	
 
#[derive(Copy, Clone, PartialEq, Eq)]
 
pub struct LocalPortHandle(PortId);
 

	
 
#[derive(Copy, Clone)]
 
pub struct LocalPeerHandle(CompId);
 

	
 
impl CompCtx {
 
    /// Creates a new component context based on a reserved entry in the
 
    /// component store. This reservation is used such that we already know our
 
    /// assigned ID.
 
    pub(crate) fn new(reservation: &CompReserved) -> Self {
 
        return Self{
 
            id: reservation.id(),
 
            ports: Vec::new(),
 
            peers: Vec::new(),
 
            port_id_counter: 0,
 
        }
 
    }
 

	
 
    /// Creates a new channel that is fully owned by the component associated
 
    /// with this context.
 
    pub(crate) fn create_channel(&mut self) -> Channel {
 
        let putter_id = PortId(self.take_port_id());
 
        let getter_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id: putter_id,
 
            peer_port_id: getter_id,
 
            kind: PortKind::Putter,
 
            state: PortState::new(),
 
            peer_comp_id: self.id,
 
            last_registered_round: None,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            associated_with_peer: false,
 
        });
 
        self.ports.push(Port{
 
            self_id: getter_id,
 
            peer_port_id: putter_id,
 
            kind: PortKind::Getter,
 
            state: PortState::new(),
 
            peer_comp_id: self.id,
 
            last_registered_round: None,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            associated_with_peer: false,
 
        });
 

	
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    /// Adds a new port. Make sure to call `change_peer` afterwards.
 
    pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle {
 
        let self_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id, peer_comp_id, peer_port_id, kind, state,
 
            last_registered_round: None,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            associated_with_peer: false,
 
        });
 
        return LocalPortHandle(self_id);
 
    }
 

	
 
    /// Adds a self-reference. Called by the runtime/scheduler
 
    pub(crate) fn add_self_reference(&mut self, self_handle: CompHandle) {
 
        debug_assert_eq!(self.id, self_handle.id());
 
        debug_assert!(self.get_peer_index_by_id(self.id).is_none());
 
        self.peers.push(Peer{
 
            id: self.id,
 
            num_associated_ports: 0,
 
            handle: self_handle
 
        });
 
    }
 

	
 
    /// Removes a self-reference. Called by the runtime/scheduler
 
    pub(crate) fn remove_self_reference(&mut self) -> Option<CompKey> {
 
        let self_index = self.get_peer_index_by_id(self.id).unwrap();
 
        let peer = &mut self.peers[self_index];
 
        let maybe_comp_key = peer.handle.decrement_users();
 
        self.peers.remove(self_index);
 

	
 
        return maybe_comp_key;
 
    }
 

	
 
    /// Removes a port. Make sure you called `change_peer` first.
 
    pub(crate) fn remove_port(&mut self, port_handle: LocalPortHandle) -> Port {
 
        let port_index = self.must_get_port_index(port_handle);
 
        let port = self.ports.remove(port_index);
 
        dbg_code!(assert!(!port.associated_with_peer));
 
        return port;
 
    }
 

	
 
    /// Changes a peer
 
    pub(crate) fn change_port_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, new_peer_comp_id: Option<CompId>) {
 
        // If port is currently associated with a peer, then remove that peer
 
        let port_index = self.get_port_index(port_handle);
 
        let port = &mut self.ports[port_index];
 
        let port_is_closed = port.state.is_closed();
 
        if port.associated_with_peer {
 
            // Remove old peer association
 
            port.associated_with_peer = false;
 
            let peer_comp_id = port.peer_comp_id;
 
            let peer_index = self.get_peer_index_by_id(peer_comp_id).unwrap();
 
            let peer = &mut self.peers[peer_index];
 

	
 
            peer.num_associated_ports -= 1;
 
            if peer.num_associated_ports == 0 {
 
                let mut peer = self.peers.remove(peer_index);
 
                if let Some(key) = peer.handle.decrement_users() {
 
                    sched_ctx.runtime.destroy_component(key);
 
                }
 
            }
 
        }
 

	
 
        // If there is a new peer, then set it as the peer associated with the
 
        // port
 
        if let Some(peer_id) = new_peer_comp_id {
 
            let port = &mut self.ports[port_index];
 
            port.peer_comp_id = peer_id;
 

	
 
            if peer_id != self.id && !port_is_closed {
 
                port.associated_with_peer = true;
 

	
 
                match self.get_peer_index_by_id(peer_id) {
 
                    Some(index) => {
 
                        let peer = &mut self.peers[index];
 
                        peer.num_associated_ports += 1;
 
                    },
 
                    None => {
 
                        let handle = sched_ctx.runtime.get_component_public(peer_id);
 
                        self.peers.push(Peer {
 
                            id: peer_id,
 
                            num_associated_ports: 1,
 
                            handle
 
                        })
 
                    }
 
                }
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn get_port_handle(&self, port_id: PortId) -> LocalPortHandle {
 
        return LocalPortHandle(port_id);
 
    }
 

	
 
    // should perhaps be revised, used in main inbox
 
    pub(crate) fn get_port_index(&self, port_handle: LocalPortHandle) -> usize {
 
        return self.must_get_port_index(port_handle);
 
    }
 

	
 
    pub(crate) fn get_peer_handle(&self, peer_id: CompId) -> LocalPeerHandle {
 
        return LocalPeerHandle(peer_id);
 
    }
 

	
 
    pub(crate) fn get_port(&self, port_handle: LocalPortHandle) -> &Port {
 
        let index = self.must_get_port_index(port_handle);
 
        return &self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_mut(&mut self, port_handle: LocalPortHandle) -> &mut Port {
 
        let index = self.must_get_port_index(port_handle);
 
        return &mut self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_by_index_mut(&mut self, index: usize) -> &mut Port {
 
        return &mut self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_peer(&self, peer_handle: LocalPeerHandle) -> &Peer {
 
        let index = self.must_get_peer_index(peer_handle);
 
        return &self.peers[index];
 
    }
 

	
 
    pub(crate) fn get_peer_mut(&mut self, peer_handle: LocalPeerHandle) -> &mut Peer {
 
        let index = self.must_get_peer_index(peer_handle);
 
        return &mut self.peers[index];
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn iter_ports(&self) -> impl Iterator<Item=&Port> {
 
        return self.ports.iter();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn iter_ports_mut(&mut self) -> impl Iterator<Item=&mut Port> {
 
        return self.ports.iter_mut();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn iter_peers(&self) -> impl Iterator<Item=&Peer> {
 
        return self.peers.iter();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn num_ports(&self) -> usize {
 
        return self.ports.len();
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Local utilities
 
    // -------------------------------------------------------------------------
 

	
 
    fn must_get_port_index(&self, handle: LocalPortHandle) -> usize {
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.self_id == handle.0 {
 
                return index;
 
            }
 
        }
 

	
 
        unreachable!()
 
    }
 

	
 
    fn must_get_peer_index(&self, handle: LocalPeerHandle) -> usize {
 
        for (index, peer) in self.peers.iter().enumerate() {
 
            if peer.id == handle.0 {
 
                return index;
 
            }
 
        }
 

	
 
        unreachable!()
 
    }
 

	
 
    fn get_peer_index_by_id(&self, comp_id: CompId) -> Option<usize> {
 
        for (index, peer) in self.peers.iter().enumerate() {
 
            if peer.id == comp_id {
 
                return Some(index);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn take_port_id(&mut self) -> u32 {
 
        let port_id = self.port_id_counter;
 
        self.port_id_counter = self.port_id_counter.wrapping_add(1);
 
        return port_id;
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)