Files @ 73e54818a189
Branch filter:

Location: CSY/reowolf/src/runtime2/component/control_layer.rs

73e54818a189 12.4 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
Max Henger
Merge branch 'feat-full-documentation' into 'master'

feat: full documentation

See merge request nl-cwi-csy/reowolf!11
use crate::runtime2::runtime::*;
use crate::runtime2::communication::*;
use crate::runtime2::component::*;

use super::component_context::*;

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(crate) struct ControlId(pub(crate) u32);

impl ControlId {
    /// Like other invalid IDs, this one doesn't care any significance, but is
    /// just set at u32::MAX to hopefully bring out bugs sooner.
    pub(crate) fn new_invalid() -> Self {
        return ControlId(u32::MAX);
    }
}

struct ControlEntry {
    id: ControlId,
    ack_countdown: u32,
    content: ControlContent,
}

enum ControlContent {
    PeerChange(ContentPeerChange),
    ScheduleComponent(CompId),
    ClosedPort(PortId),
    UnblockPutWithPorts
}

struct ContentPeerChange {
    source_port: PortId,
    source_comp: CompId,
    old_target_port: PortId,
    new_target_port: PortId,
    new_target_comp: CompId,
    schedule_entry_id: ControlId,
}

struct ControlClosedPort {
    closed_port: PortId,
    exit_entry_id: Option<ControlId>,
}

pub(crate) enum AckAction {
    None,
    SendMessage(CompId, ControlMessage),
    ScheduleComponent(CompId),
    UnblockPutWithPorts,
}

/// Handling/sending control messages.
pub(crate) struct ControlLayer {
    id_counter: ControlId,
    entries: Vec<ControlEntry>,
}

impl ControlLayer {
    pub(crate) fn should_reroute(&self, message: &mut Message) -> Option<CompId> {
        // Safety note: rerouting should occur during the time when we're
        // notifying a peer of a new component. During this period that
        // component hasn't been executed yet, so cannot have died yet.
        // FIX @NoDirectHandle
        let target_port = message.target_port();
        if target_port.is_none() {
            return None;
        }

        let target_port = target_port.unwrap();
        for entry in &self.entries {
            if let ControlContent::PeerChange(entry) = &entry.content {
                if entry.old_target_port == target_port {
                    message.modify_target_port(entry.new_target_port);
                    return Some(entry.new_target_comp);
                }
            }
        }

        return None;
    }

    /// Handles an acknowledgement. The returned action must be performed by the
    /// caller. The optionally returned `ControlId` must be used and passed to
    /// `handle_ack` again.
    pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> (AckAction, Option<ControlId>) {
        let entry_index = self.get_entry_index_by_id(entry_id).unwrap();
        let entry = &mut self.entries[entry_index];
        debug_assert!(entry.ack_countdown > 0);

        entry.ack_countdown -= 1;
        if entry.ack_countdown != 0 {
            return (AckAction::None, None);
        }

        let entry = self.entries.remove(entry_index);

        // All `Ack`s received, take action based on the kind of entry
        match entry.content {
            ControlContent::PeerChange(content) => {
                // If change of peer is ack'd. Then we are certain we have
                // rerouted all of the messages, and the sender's port can now
                // be unblocked again.
                let target_comp_id = content.source_comp;
                let message_to_send = ControlMessage{
                    id: ControlId::new_invalid(),
                    sender_comp_id: comp_ctx.id,
                    target_port_id: Some(content.source_port),
                    content: ControlMessageContent::PortPeerChangedUnblock(
                        content.new_target_port,
                        content.new_target_comp
                    )
                };
                let to_ack = content.schedule_entry_id;

                return (
                    AckAction::SendMessage(target_comp_id, message_to_send),
                    Some(to_ack)
                );
            },
            ControlContent::ScheduleComponent(to_schedule) => {
                // If all change-of-peers are `Ack`d, then we're ready to
                // schedule the component!
                return (AckAction::ScheduleComponent(to_schedule), None);
            },
            ControlContent::ClosedPort(closed_port) => {
                // If a closed port is Ack'd, then we remove the reference to
                // that component.
                let port_handle = comp_ctx.get_port_handle(closed_port);
                debug_assert!(comp_ctx.get_port(port_handle).state.is_closed());
                comp_ctx.change_port_peer(sched_ctx, port_handle, None);

                return (AckAction::None, None);
            },
            ControlContent::UnblockPutWithPorts => {
                return (AckAction::UnblockPutWithPorts, None);
            }
        }
    }

    pub(crate) fn has_acks_remaining(&self) -> bool {
        return !self.entries.is_empty();
    }

    // -------------------------------------------------------------------------
    // Port transfer
    // -------------------------------------------------------------------------

    /// Adds an entry that, when completely ack'd, will schedule a component.
    pub(crate) fn add_schedule_entry(&mut self, to_schedule_id: CompId) -> ControlId {
        let entry_id = self.take_id();
        self.entries.push(ControlEntry{
            id: entry_id,
            ack_countdown: 0, // incremented by calls to `add_reroute_entry`
            content: ControlContent::ScheduleComponent(to_schedule_id),
        });

        return entry_id;
    }

    /// Adds an entry that returns the similarly named Ack action
    pub(crate) fn add_unblock_put_with_ports_entry(&mut self) -> ControlId {
        let entry_id = self.take_id();
        self.entries.push(ControlEntry{
            id: entry_id,
            ack_countdown: 0, // incremented by calls to `add_reroute_entry`
            content: ControlContent::UnblockPutWithPorts,
        });

        return entry_id;
    }

    /// Adds a rerouting entry (used to ensure all messages will end up at a
    /// newly created component). Used when creating a new component.
    pub(crate) fn add_reroute_entry(
        &mut self, creator_comp_id: CompId,
        source_port_id: PortId, source_comp_id: CompId,
        old_target_port_id: PortId, new_target_port_id: PortId, new_comp_id: CompId,
        schedule_entry_id: ControlId,
    ) -> Message {
        let entry_id = self.take_id();
        self.entries.push(ControlEntry{
            id: entry_id,
            ack_countdown: 1,
            content: ControlContent::PeerChange(ContentPeerChange{
                source_port: source_port_id,
                source_comp: source_comp_id,
                old_target_port: old_target_port_id,
                new_target_port: new_target_port_id,
                new_target_comp: new_comp_id,
                schedule_entry_id,
            }),
        });

        // increment counter on schedule entry
        let entry_index = self.get_entry_index_by_id(schedule_entry_id).unwrap();
        self.entries[entry_index].ack_countdown += 1;

        return Message::Control(ControlMessage{
            id: entry_id,
            sender_comp_id: creator_comp_id,
            target_port_id: Some(source_port_id),
            content: ControlMessageContent::PortPeerChangedBlock
        })
    }

    /// Creates a PortPeerChanged message (and increments ack-counter on a
    /// pre-created control entry) that is used as a preliminary step before
    /// transferring a port over a channel.
    pub(crate) fn create_port_transfer_message(
        &mut self, associated_control_id: ControlId,
        sender_comp_id: CompId, target_port_id: PortId
    ) -> Message {
        let entry_index = self.get_entry_index_by_id(associated_control_id).unwrap();
        self.entries[entry_index].ack_countdown += 1;

        return Message::Control(ControlMessage{
            id: associated_control_id,
            sender_comp_id,
            target_port_id: Some(target_port_id),
            content: ControlMessageContent::PortPeerChangedBlock
        })
    }

    // -------------------------------------------------------------------------
    // Blocking, unblocking, and closing ports
    // -------------------------------------------------------------------------

    pub(crate) fn has_close_port_entry(&self, port_handle: LocalPortHandle, comp_ctx: &CompCtx) -> Option<ControlId> {
        let port = comp_ctx.get_port(port_handle);
        let port_id = port.self_id;
        for entry in self.entries.iter() {
            if let ControlContent::ClosedPort(entry_port_id) = &entry.content {
                if *entry_port_id == port_id {
                    return Some(entry.id);
                }
            }
        }

        return None;
    }

    /// Initiates the control message procedures for closing a port. Caller must
    /// make sure that the port state has already been set to `Closed`.
    pub(crate) fn initiate_port_closing(&mut self, port_handle: LocalPortHandle, exit_inside_sync: bool, comp_ctx: &CompCtx) -> (LocalPeerHandle, ControlMessage) {
        let port = comp_ctx.get_port(port_handle);
        let peer_port_id = port.peer_port_id;
        debug_assert!(port.state.is_closed());

        // Construct the port-closing entry
        let entry_id = self.take_id();
        self.entries.push(ControlEntry{
            id: entry_id,
            ack_countdown: 1,
            content: ControlContent::ClosedPort(port.self_id),
        });

        // Return the messages notifying peer of the closed port
        let peer_handle = comp_ctx.get_peer_handle(port.peer_comp_id);
        return (
            peer_handle,
            ControlMessage{
                id: entry_id,
                sender_comp_id: comp_ctx.id,
                target_port_id: Some(peer_port_id),
                content: ControlMessageContent::ClosePort(ControlMessageClosePort{
                    closed_in_sync_round: exit_inside_sync,
                }),
            }
        );
    }

    /// Generates the control message used to indicate to a peer that a port
    /// should be blocked (expects the caller to have set the port's state to
    /// blocked).
    pub(crate) fn initiate_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) {
        let port_info = comp_ctx.get_port(port_handle);
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're telling the putter to block
        debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers)); // contract with caller

        let peer_port_id = port_info.peer_port_id;
        let peer_comp_id = port_info.peer_comp_id;
        let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);

        return (
            peer_handle,
            ControlMessage{
                id: ControlId::new_invalid(),
                sender_comp_id: comp_ctx.id,
                target_port_id: Some(peer_port_id),
                content: ControlMessageContent::BlockPort,
            }
        );
    }

    /// Generates a messages used to indicate to a peer that a port should be
    /// unblocked again.
    pub(crate) fn cancel_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) {
        let port_info = comp_ctx.get_port(port_handle);
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're initiating the unblocking

        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);

        return (
            peer_handle,
            ControlMessage{
                id: ControlId::new_invalid(),
                sender_comp_id: comp_ctx.id,
                target_port_id: Some(port_info.peer_port_id),
                content: ControlMessageContent::UnblockPort,
            }
        );
    }

    // -------------------------------------------------------------------------
    // Internal utilities
    // -------------------------------------------------------------------------

    fn take_id(&mut self) -> ControlId {
        let id = self.id_counter;
        self.id_counter.0 = self.id_counter.0.wrapping_add(1);
        return id;
    }

    fn get_entry_index_by_id(&self, entry_id: ControlId) -> Option<usize> {
        for (index, entry) in self.entries.iter().enumerate() {
            if entry.id == entry_id {
                return Some(index);
            }
        }

        return None;
    }
}

impl Default for ControlLayer {
    fn default() -> Self {
        return ControlLayer{
            id_counter: ControlId(0),
            entries: Vec::new(),
        }
    }
}