Files @ 2c1fa43903ac
Branch filter:

Location: CSY/reowolf/src/runtime2/component/component.rs - annotation

2c1fa43903ac 15.2 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
Several unfinished attempts at introducing polling
aefbf606d736
7cf6df93d16d
5babd5401b1e
aefbf606d736
aefbf606d736
7cf6df93d16d
aefbf606d736
be8ea413a49a
aefbf606d736
aefbf606d736
5babd5401b1e
5babd5401b1e
5babd5401b1e
5babd5401b1e
5babd5401b1e
5babd5401b1e
5babd5401b1e
5babd5401b1e
93081320c9fa
5babd5401b1e
93081320c9fa
93081320c9fa
93081320c9fa
93081320c9fa
93081320c9fa
93081320c9fa
93081320c9fa
93081320c9fa
93081320c9fa
93081320c9fa
93081320c9fa
7cf6df93d16d
7cf6df93d16d
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
7cf6df93d16d
7cf6df93d16d
7cf6df93d16d
7cf6df93d16d
7cf6df93d16d
7cf6df93d16d
7cf6df93d16d
7cf6df93d16d
7cf6df93d16d
7cf6df93d16d
7cf6df93d16d
7cf6df93d16d
7cf6df93d16d
7cf6df93d16d
be8ea413a49a
be8ea413a49a
be8ea413a49a
be8ea413a49a
be8ea413a49a
be8ea413a49a
7cf6df93d16d
7cf6df93d16d
7cf6df93d16d
7cf6df93d16d
7cf6df93d16d
7cf6df93d16d
7cf6df93d16d
7cf6df93d16d
7cf6df93d16d
be8ea413a49a
be8ea413a49a
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
135965141596
c03e28261b5d
135965141596
135965141596
135965141596
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
c03e28261b5d
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
aefbf606d736
be8ea413a49a
be8ea413a49a
be8ea413a49a
be8ea413a49a
be8ea413a49a
be8ea413a49a
be8ea413a49a
be8ea413a49a
be8ea413a49a
be8ea413a49a
use crate::protocol::eval::{Prompt, EvalError, ValueGroup, PortId as EvalPortId};
use crate::protocol::*;
use crate::runtime2::*;
use crate::runtime2::communication::*;

use super::{CompCtx, CompPDL};
use super::component_context::*;
use super::component_ip::*;
use super::control_layer::*;
use super::consensus::*;

pub enum CompScheduling {
    Immediate,
    Requeue,
    Sleep,
    Exit,
}

/// Generic representation of a component (as viewed by a scheduler).
pub(crate) trait Component {
    /// Called if the component is created by another component and the messages
    /// are being transferred between the two.
    fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage);

    /// Called if the component receives a new message. The component is
    /// responsible for deciding where that messages goes.
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message);

    /// Called if the component's routine should be executed. The return value
    /// can be used to indicate when the routine should be run again.
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError>;
}

/// Representation of the generic operating mode of a component.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum CompMode {
    NonSync, // not in sync mode
    Sync, // in sync mode, can interact with other components
    SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block
    BlockedGet, // blocked because we need to receive a message on a particular port
    BlockedPut, // component is blocked because the port is blocked
    BlockedSelect, // waiting on message to complete the select statement
    StartExit, // temporary state: if encountered then we start the shutdown process
    BusyExit, // temporary state: waiting for Acks for all the closed ports
    Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0
}

impl CompMode {
    pub(crate) fn is_in_sync_block(&self) -> bool {
        use CompMode::*;

        match self {
            Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true,
            NonSync | StartExit | BusyExit | Exit => false,
        }
    }
}

/// Component execution state: the execution mode along with some descriptive
/// fields. Fields are public for ergonomic reasons, use member functions when
/// appropriate.
pub(crate) struct CompExecState {
    pub mode: CompMode,
    pub mode_port: PortId, // valid if blocked on a port (put/get)
    pub mode_value: ValueGroup, // valid if blocked on a put
}

impl CompExecState {
    pub(crate) fn new() -> Self {
        return Self{
            mode: CompMode::NonSync,
            mode_port: PortId::new_invalid(),
            mode_value: ValueGroup::default(),
        }
    }

    pub(crate) fn set_as_blocked_get(&mut self, port: PortId) {
        self.mode = CompMode::BlockedGet;
        self.mode_port = port;
        debug_assert!(self.mode_value.values.is_empty());
    }

    pub(crate) fn is_blocked_on_get(&self, port: PortId) -> bool {
        return
            self.mode == CompMode::BlockedGet &&
            self.mode_port == port;
    }

    pub(crate) fn set_as_blocked_put(&mut self, port: PortId, value: ValueGroup) {
        self.mode = CompMode::BlockedPut;
        self.mode_port = port;
        self.mode_value = value;
    }

    pub(crate) fn is_blocked_on_put(&self, port: PortId) -> bool {
        return
            self.mode == CompMode::BlockedPut &&
            self.mode_port == port;
    }
}

/// Creates a new component based on its definition. Meaning that if it is a
/// user-defined component then we set up the PDL code state. Otherwise we
/// construct a custom component. This does NOT take care of port and message
/// management.
pub(crate) fn create_component(
    protocol: &ProtocolDescription,
    definition_id: ProcedureDefinitionId, type_id: TypeId,
    arguments: ValueGroup, num_ports: usize
) -> Box<dyn Component> {
    let definition = &protocol.heap[definition_id];
    debug_assert!(definition.kind == ProcedureKind::Primitive || definition.kind == ProcedureKind::Composite);

    if definition.source.is_builtin() {
        // Builtin component
        let component = match definition.source {
            ProcedureSource::CompRandomU32 => Box::new(ComponentRandomU32::new(arguments)),
            _ => unreachable!(),
        };

        return component;
    } else {
        // User-defined component
        let prompt = Prompt::new(
            &protocol.types, &protocol.heap,
            definition_id, type_id, arguments
        );
        let component = CompPDL::new(prompt, num_ports);
        return Box::new(component);
    }
}

// -----------------------------------------------------------------------------
// Generic component messaging utilities (for sending and receiving)
// -----------------------------------------------------------------------------

/// Handles control messages in the default way. Note that this function may
/// take a lot of actions in the name of the caller: pending messages may be
/// sent, ports may become blocked/unblocked, etc. So the execution
/// (`CompExecState`), control (`ControlLayer`) and consensus (`Consensus`)
/// state may all change.
pub(crate) fn default_handle_control_message(
    exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus,
    message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
) {
    match message.content {
        ControlMessageContent::Ack => {
            default_handle_ack(control, message.id, sched_ctx, comp_ctx);
        },
        ControlMessageContent::BlockPort(port_id) => {
            // One of our messages was accepted, but the port should be
            // blocked.
            let port_handle = comp_ctx.get_port_handle(port_id);
            let port_info = comp_ctx.get_port(port_handle);
            debug_assert_eq!(port_info.kind, PortKind::Putter);
            if port_info.state == PortState::Open {
                // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes
                comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers);
            }
        },
        ControlMessageContent::ClosePort(port_id) => {
            // Request to close the port. We immediately comply and remove
            // the component handle as well
            let port_handle = comp_ctx.get_port_handle(port_id);
            let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id;
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);

            // One exception to sending an `Ack` is if we just closed the
            // port ourselves, meaning that the `ClosePort` messages got
            // sent to one another.
            if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) {
                default_handle_ack(control, control_id, sched_ctx, comp_ctx);
            } else {
                default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
                comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed
                comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed
            }
        },
        ControlMessageContent::UnblockPort(port_id) => {
            // We were previously blocked (or already closed)
            let port_handle = comp_ctx.get_port_handle(port_id);
            let port_info = comp_ctx.get_port(port_handle);
            debug_assert_eq!(port_info.kind, PortKind::Putter);
            if port_info.state == PortState::BlockedDueToFullBuffers {
                default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
            }
        },
        ControlMessageContent::PortPeerChangedBlock(port_id) => {
            // The peer of our port has just changed. So we are asked to
            // temporarily block the port (while our original recipient is
            // potentially rerouting some of the in-flight messages) and
            // Ack. Then we wait for the `unblock` call.
            debug_assert_eq!(message.target_port_id, Some(port_id));
            let port_handle = comp_ctx.get_port_handle(port_id);
            comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange);

            let port_info = comp_ctx.get_port(port_handle);
            let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);

            default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
        },
        ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => {
            let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap());
            let port_info = comp_ctx.get_port(port_handle);
            debug_assert!(port_info.state == PortState::BlockedDueToPeerChange);
            let old_peer_id = port_info.peer_comp_id;

            comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false);

            let port_info = comp_ctx.get_port_mut(port_handle);
            port_info.peer_comp_id = new_comp_id;
            port_info.peer_port_id = new_port_id;
            comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None);
            default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
        }
    }
}

/// Handles a component initiating the exiting procedure, and closing all of its
/// ports. Should only be called once per component (which is ensured by
/// checking and modifying the mode in the execution state).
pub(crate) fn default_handle_start_exit(
    exec_state: &mut CompExecState, control: &mut ControlLayer,
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
) -> CompScheduling {
    debug_assert_eq!(exec_state.mode, CompMode::StartExit);
    sched_ctx.log("Component starting exit");
    exec_state.mode = CompMode::BusyExit;

    // Iterating by index to work around borrowing rules
    for port_index in 0..comp_ctx.num_ports() {
        let port = comp_ctx.get_port_by_index_mut(port_index);
        if port.state == PortState::Closed {
            // Already closed, or in the process of being closed
            continue;
        }

        // Mark as closed
        let port_id = port.self_id;
        port.state = PortState::Closed;

        // Notify peer of closing
        let port_handle = comp_ctx.get_port_handle(port_id);
        let (peer, message) = control.initiate_port_closing(port_handle, comp_ctx);
        let peer_info = comp_ctx.get_peer(peer);
        peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
    }

    return CompScheduling::Immediate; // to check if we can shut down immediately
}

/// Handles a component waiting until all peers are notified that it is quitting
/// (i.e. after calling `default_handle_start_exit`).
pub(crate) fn default_handle_busy_exit(
    exec_state: &mut CompExecState, control: &ControlLayer,
    sched_ctx: &SchedulerCtx
) -> CompScheduling {
    debug_assert_eq!(exec_state.mode, CompMode::BusyExit);
    if control.has_acks_remaining() {
        sched_ctx.log("Component busy exiting, still has `Ack`s remaining");
        return CompScheduling::Sleep;
    } else {
        sched_ctx.log("Component busy exiting, now shutting down");
        exec_state.mode = CompMode::Exit;
        return CompScheduling::Exit;
    }
}

#[inline]
pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling {
    debug_assert_eq!(_exec_state.mode, CompMode::Exit);
    return CompScheduling::Exit;
}

// -----------------------------------------------------------------------------
// Internal messaging/state utilities
// -----------------------------------------------------------------------------

/// Handles an `Ack` for the control layer.
fn default_handle_ack(
    control: &mut ControlLayer, control_id: ControlId,
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
) {
    // Since an `Ack` may cause another one, handle them in a loop
    let mut to_ack = control_id;
    loop {
        let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx);
        match action {
            AckAction::SendMessage(target_comp, message) => {
                // FIX @NoDirectHandle
                let mut handle = sched_ctx.runtime.get_component_public(target_comp);
                handle.send_message(sched_ctx, Message::Control(message), true);
                let _should_remove = handle.decrement_users();
                debug_assert!(_should_remove.is_none());
            },
            AckAction::ScheduleComponent(to_schedule) => {
                // FIX @NoDirectHandle
                let mut handle = sched_ctx.runtime.get_component_public(to_schedule);

                // Note that the component is intentionally not
                // sleeping, so we just wake it up
                debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire));
                let key = unsafe { to_schedule.upgrade() };
                sched_ctx.runtime.enqueue_work(key);
                let _should_remove = handle.decrement_users();
                debug_assert!(_should_remove.is_none());
            },
            AckAction::None => {}
        }

        match new_to_ack {
            Some(new_to_ack) => to_ack = new_to_ack,
            None => break,
        }
    }
}

/// Little helper for sending the most common kind of `Ack`
fn default_send_ack(
    causer_of_ack_id: ControlId, peer_handle: LocalPeerHandle,
    sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx
) {
    let peer_info = comp_ctx.get_peer(peer_handle);
    peer_info.handle.send_message(sched_ctx, Message::Control(ControlMessage{
        id: causer_of_ack_id,
        sender_comp_id: comp_ctx.id,
        target_port_id: None,
        content: ControlMessageContent::Ack
    }), true);
}

/// Handles the unblocking of a putter port. In case there is a pending message
/// on that port then it will be sent.
fn default_handle_unblock_put(
    exec_state: &mut CompExecState, consensus: &mut Consensus,
    port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
) {
    let port_info = comp_ctx.get_port_mut(port_handle);
    let port_id = port_info.self_id;
    debug_assert!(port_info.state.is_blocked());
    port_info.state = PortState::Open;

    if exec_state.is_blocked_on_put(port_id) {
        // Annotate the message that we're going to send
        let port_info = comp_ctx.get_port(port_handle); // for immutable access
        debug_assert_eq!(port_info.kind, PortKind::Putter);
        let to_send = exec_state.mode_value.take();
        let to_send = consensus.annotate_data_message(comp_ctx, port_info, to_send);

        // Retrieve peer to send the message
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
        let peer_info = comp_ctx.get_peer(peer_handle);
        peer_info.handle.send_message(sched_ctx, Message::Data(to_send), true);

        exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state.
        exec_state.mode_port = PortId::new_invalid();
    }
}


#[inline]
pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId {
    return PortId(port_id.id);
}

#[inline]
pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId {
    return EvalPortId{ id: port_id.0 };
}