Files @ 644bbf1ed134
Branch filter:

Location: CSY/reowolf/src/runtime2/component/component.rs

644bbf1ed134 16.2 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
WIP on TCP component implementation, changes to interface
use crate::protocol::eval::{Prompt, EvalError, ValueGroup, PortId as EvalPortId};
use crate::protocol::*;
use crate::runtime2::*;
use crate::runtime2::communication::*;

use super::{CompCtx, CompPDL};
use super::component_context::*;
use super::component_random::*;
use super::control_layer::*;
use super::consensus::*;

pub enum CompScheduling {
    Immediate,
    Requeue,
    Sleep,
    Exit,
}

/// Generic representation of a component (as viewed by a scheduler).
pub(crate) trait Component {
    /// Called upon the creation of the component.
    fn on_creation(&mut self, sched_ctx: &SchedulerCtx);

    /// Called if the component is created by another component and the messages
    /// are being transferred between the two.
    fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage);

    /// Called if the component receives a new message. The component is
    /// responsible for deciding where that messages goes.
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message);

    /// Called if the component's routine should be executed. The return value
    /// can be used to indicate when the routine should be run again.
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError>;
}

/// Representation of the generic operating mode of a component.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum CompMode {
    NonSync, // not in sync mode
    Sync, // in sync mode, can interact with other components
    SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block
    BlockedGet, // blocked because we need to receive a message on a particular port
    BlockedPut, // component is blocked because the port is blocked
    BlockedSelect, // waiting on message to complete the select statement
    StartExit, // temporary state: if encountered then we start the shutdown process
    BusyExit, // temporary state: waiting for Acks for all the closed ports
    Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0
}

impl CompMode {
    pub(crate) fn is_in_sync_block(&self) -> bool {
        use CompMode::*;

        match self {
            Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true,
            NonSync | StartExit | BusyExit | Exit => false,
        }
    }
}

/// Component execution state: the execution mode along with some descriptive
/// fields. Fields are public for ergonomic reasons, use member functions when
/// appropriate.
pub(crate) struct CompExecState {
    pub mode: CompMode,
    pub mode_port: PortId, // valid if blocked on a port (put/get)
    pub mode_value: ValueGroup, // valid if blocked on a put
}

impl CompExecState {
    pub(crate) fn new() -> Self {
        return Self{
            mode: CompMode::NonSync,
            mode_port: PortId::new_invalid(),
            mode_value: ValueGroup::default(),
        }
    }

    pub(crate) fn set_as_blocked_get(&mut self, port: PortId) {
        self.mode = CompMode::BlockedGet;
        self.mode_port = port;
        debug_assert!(self.mode_value.values.is_empty());
    }

    pub(crate) fn is_blocked_on_get(&self, port: PortId) -> bool {
        return
            self.mode == CompMode::BlockedGet &&
            self.mode_port == port;
    }

    pub(crate) fn set_as_blocked_put(&mut self, port: PortId, value: ValueGroup) {
        self.mode = CompMode::BlockedPut;
        self.mode_port = port;
        self.mode_value = value;
    }

    pub(crate) fn is_blocked_on_put(&self, port: PortId) -> bool {
        return
            self.mode == CompMode::BlockedPut &&
            self.mode_port == port;
    }
}

/// Creates a new component based on its definition. Meaning that if it is a
/// user-defined component then we set up the PDL code state. Otherwise we
/// construct a custom component. This does NOT take care of port and message
/// management.
pub(crate) fn create_component(
    protocol: &ProtocolDescription,
    definition_id: ProcedureDefinitionId, type_id: TypeId,
    arguments: ValueGroup, num_ports: usize
) -> Box<dyn Component> {
    let definition = &protocol.heap[definition_id];
    debug_assert!(definition.kind == ProcedureKind::Primitive || definition.kind == ProcedureKind::Composite);

    if definition.source.is_builtin() {
        // Builtin component
        let component = match definition.source {
            ProcedureSource::CompRandomU32 => Box::new(ComponentRandomU32::new(arguments)),
            _ => unreachable!(),
        };

        return component;
    } else {
        // User-defined component
        let prompt = Prompt::new(
            &protocol.types, &protocol.heap,
            definition_id, type_id, arguments
        );
        let component = CompPDL::new(prompt, num_ports);
        return Box::new(component);
    }
}

// -----------------------------------------------------------------------------
// Generic component messaging utilities (for sending and receiving)
// -----------------------------------------------------------------------------

/// Handles control messages in the default way. Note that this function may
/// take a lot of actions in the name of the caller: pending messages may be
/// sent, ports may become blocked/unblocked, etc. So the execution
/// (`CompExecState`), control (`ControlLayer`) and consensus (`Consensus`)
/// state may all change.
pub(crate) fn default_handle_control_message(
    exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus,
    message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
) {
    match message.content {
        ControlMessageContent::Ack => {
            default_handle_ack(control, message.id, sched_ctx, comp_ctx);
        },
        ControlMessageContent::BlockPort(port_id) => {
            // One of our messages was accepted, but the port should be
            // blocked.
            let port_handle = comp_ctx.get_port_handle(port_id);
            let port_info = comp_ctx.get_port(port_handle);
            debug_assert_eq!(port_info.kind, PortKind::Putter);
            if port_info.state == PortState::Open {
                // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes
                comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers);
            }
        },
        ControlMessageContent::ClosePort(port_id) => {
            // Request to close the port. We immediately comply and remove
            // the component handle as well
            let port_handle = comp_ctx.get_port_handle(port_id);
            let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id;
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);

            // One exception to sending an `Ack` is if we just closed the
            // port ourselves, meaning that the `ClosePort` messages got
            // sent to one another.
            if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) {
                default_handle_ack(control, control_id, sched_ctx, comp_ctx);
            } else {
                default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
                comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed
                comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed
            }
        },
        ControlMessageContent::UnblockPort(port_id) => {
            // We were previously blocked (or already closed)
            let port_handle = comp_ctx.get_port_handle(port_id);
            let port_info = comp_ctx.get_port(port_handle);
            debug_assert_eq!(port_info.kind, PortKind::Putter);
            if port_info.state == PortState::BlockedDueToFullBuffers {
                default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
            }
        },
        ControlMessageContent::PortPeerChangedBlock(port_id) => {
            // The peer of our port has just changed. So we are asked to
            // temporarily block the port (while our original recipient is
            // potentially rerouting some of the in-flight messages) and
            // Ack. Then we wait for the `unblock` call.
            debug_assert_eq!(message.target_port_id, Some(port_id));
            let port_handle = comp_ctx.get_port_handle(port_id);
            comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange);

            let port_info = comp_ctx.get_port(port_handle);
            let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);

            default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
        },
        ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => {
            let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap());
            let port_info = comp_ctx.get_port(port_handle);
            debug_assert!(port_info.state == PortState::BlockedDueToPeerChange);
            let old_peer_id = port_info.peer_comp_id;

            comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false);

            let port_info = comp_ctx.get_port_mut(port_handle);
            port_info.peer_comp_id = new_comp_id;
            port_info.peer_port_id = new_port_id;
            comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None);
            default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
        }
    }
}

/// Handles a component initiating the exiting procedure, and closing all of its
/// ports. Should only be called once per component (which is ensured by
/// checking and modifying the mode in the execution state).
pub(crate) fn default_handle_start_exit(
    exec_state: &mut CompExecState, control: &mut ControlLayer,
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
) -> CompScheduling {
    debug_assert_eq!(exec_state.mode, CompMode::StartExit);
    sched_ctx.log("Component starting exit");
    exec_state.mode = CompMode::BusyExit;

    // Iterating by index to work around borrowing rules
    for port_index in 0..comp_ctx.num_ports() {
        let port = comp_ctx.get_port_by_index_mut(port_index);
        if port.state == PortState::Closed {
            // Already closed, or in the process of being closed
            continue;
        }

        // Mark as closed
        let port_id = port.self_id;
        port.state = PortState::Closed;

        // Notify peer of closing
        let port_handle = comp_ctx.get_port_handle(port_id);
        let (peer, message) = control.initiate_port_closing(port_handle, comp_ctx);
        let peer_info = comp_ctx.get_peer(peer);
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
    }

    return CompScheduling::Immediate; // to check if we can shut down immediately
}

/// Handles a component waiting until all peers are notified that it is quitting
/// (i.e. after calling `default_handle_start_exit`).
pub(crate) fn default_handle_busy_exit(
    exec_state: &mut CompExecState, control: &ControlLayer,
    sched_ctx: &SchedulerCtx
) -> CompScheduling {
    debug_assert_eq!(exec_state.mode, CompMode::BusyExit);
    if control.has_acks_remaining() {
        sched_ctx.log("Component busy exiting, still has `Ack`s remaining");
        return CompScheduling::Sleep;
    } else {
        sched_ctx.log("Component busy exiting, now shutting down");
        exec_state.mode = CompMode::Exit;
        return CompScheduling::Exit;
    }
}

/// Handles a potential synchronous round decision. If there was a decision then
/// the `Some(success)` value indicates whether the round succeeded or not.
pub(crate) fn default_handle_sync_decision(
    exec_state: &mut CompExecState, decision: SyncRoundDecision,
    consensus: &mut Consensus
) -> Option<bool> {
    debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
    let success = match decision {
        SyncRoundDecision::None => return None,
        SyncRoundDecision::Solution => true,
        SyncRoundDecision::Failure => false,
    };

    debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
    if success {
        exec_state.mode = CompMode::NonSync;
        consensus.notify_sync_decision(decision);
        return Some(true);
    } else {
        exec_state.mode = CompMode::StartExit;
        return Some(false);
    }
}


#[inline]
pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling {
    debug_assert_eq!(_exec_state.mode, CompMode::Exit);
    return CompScheduling::Exit;
}

// -----------------------------------------------------------------------------
// Internal messaging/state utilities
// -----------------------------------------------------------------------------

/// Handles an `Ack` for the control layer.
fn default_handle_ack(
    control: &mut ControlLayer, control_id: ControlId,
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
) {
    // Since an `Ack` may cause another one, handle them in a loop
    let mut to_ack = control_id;
    loop {
        let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx);
        match action {
            AckAction::SendMessage(target_comp, message) => {
                // FIX @NoDirectHandle
                let mut handle = sched_ctx.runtime.get_component_public(target_comp);
                handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
                let _should_remove = handle.decrement_users();
                debug_assert!(_should_remove.is_none());
            },
            AckAction::ScheduleComponent(to_schedule) => {
                // FIX @NoDirectHandle
                let mut handle = sched_ctx.runtime.get_component_public(to_schedule);

                // Note that the component is intentionally not
                // sleeping, so we just wake it up
                debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire));
                let key = unsafe { to_schedule.upgrade() };
                sched_ctx.runtime.enqueue_work(key);
                let _should_remove = handle.decrement_users();
                debug_assert!(_should_remove.is_none());
            },
            AckAction::None => {}
        }

        match new_to_ack {
            Some(new_to_ack) => to_ack = new_to_ack,
            None => break,
        }
    }
}

/// Little helper for sending the most common kind of `Ack`
fn default_send_ack(
    causer_of_ack_id: ControlId, peer_handle: LocalPeerHandle,
    sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx
) {
    let peer_info = comp_ctx.get_peer(peer_handle);
    peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(ControlMessage{
        id: causer_of_ack_id,
        sender_comp_id: comp_ctx.id,
        target_port_id: None,
        content: ControlMessageContent::Ack
    }), true);
}

/// Handles the unblocking of a putter port. In case there is a pending message
/// on that port then it will be sent.
fn default_handle_unblock_put(
    exec_state: &mut CompExecState, consensus: &mut Consensus,
    port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
) {
    let port_info = comp_ctx.get_port_mut(port_handle);
    let port_id = port_info.self_id;
    debug_assert!(port_info.state.is_blocked());
    port_info.state = PortState::Open;

    if exec_state.is_blocked_on_put(port_id) {
        // Annotate the message that we're going to send
        let port_info = comp_ctx.get_port(port_handle); // for immutable access
        debug_assert_eq!(port_info.kind, PortKind::Putter);
        let to_send = exec_state.mode_value.take();
        let to_send = consensus.annotate_data_message(comp_ctx, port_info, to_send);

        // Retrieve peer to send the message
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
        let peer_info = comp_ctx.get_peer(peer_handle);
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(to_send), true);

        exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state.
        exec_state.mode_port = PortId::new_invalid();
    }
}

#[inline]
pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId {
    return PortId(port_id.id);
}

#[inline]
pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId {
    return EvalPortId{ id: port_id.0 };
}