Files @ e7df1d2ae35f
Branch filter:

Location: CSY/reowolf/src/runtime2/component/control_layer.rs - annotation

e7df1d2ae35f 11.4 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
mh
WIP: Updated port management to be more maintainable
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
e7df1d2ae35f
e7df1d2ae35f
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
9e771c9cf8d3
968e958c3286
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
968e958c3286
0de39654770f
0de39654770f
0de39654770f
9e771c9cf8d3
9e771c9cf8d3
968e958c3286
9e771c9cf8d3
968e958c3286
bf4c0ee5ba65
bf4c0ee5ba65
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
968e958c3286
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
bf4c0ee5ba65
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
bf4c0ee5ba65
bf4c0ee5ba65
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
0de39654770f
e7df1d2ae35f
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
bf4c0ee5ba65
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
9e771c9cf8d3
968e958c3286
968e958c3286
c04f7fea1a62
968e958c3286
968e958c3286
c04f7fea1a62
968e958c3286
968e958c3286
0de39654770f
0de39654770f
0de39654770f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
0de39654770f
0de39654770f
0de39654770f
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
c04f7fea1a62
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
e7df1d2ae35f
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
bf4c0ee5ba65
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
bf4c0ee5ba65
bf4c0ee5ba65
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
0de39654770f
968e958c3286
968e958c3286
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
c04f7fea1a62
0de39654770f
0de39654770f
0de39654770f
0de39654770f
c04f7fea1a62
6555f56a22a9
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
c04f7fea1a62
0de39654770f
0de39654770f
0de39654770f
c04f7fea1a62
c04f7fea1a62
0de39654770f
0de39654770f
0de39654770f
0de39654770f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
c04f7fea1a62
e7df1d2ae35f
968e958c3286
e7df1d2ae35f
968e958c3286
968e958c3286
968e958c3286
e7df1d2ae35f
968e958c3286
968e958c3286
968e958c3286
e7df1d2ae35f
968e958c3286
968e958c3286
968e958c3286
e7df1d2ae35f
c04f7fea1a62
968e958c3286
968e958c3286
968e958c3286
968e958c3286
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
968e958c3286
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
968e958c3286
e7df1d2ae35f
e7df1d2ae35f
968e958c3286
968e958c3286
e7df1d2ae35f
968e958c3286
e7df1d2ae35f
968e958c3286
e7df1d2ae35f
e7df1d2ae35f
968e958c3286
e7df1d2ae35f
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
9e771c9cf8d3
968e958c3286
9e771c9cf8d3
9e771c9cf8d3
968e958c3286
e7df1d2ae35f
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
9e771c9cf8d3
use crate::runtime2::runtime::*;
use crate::runtime2::communication::*;
use crate::runtime2::component::*;

use super::component_context::*;

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(crate) struct ControlId(u32);

impl ControlId {
    /// Like other invalid IDs, this one doesn't care any significance, but is
    /// just set at u32::MAX to hopefully bring out bugs sooner.
    fn new_invalid() -> Self {
        return ControlId(u32::MAX);
    }
}

struct ControlEntry {
    id: ControlId,
    ack_countdown: u32,
    content: ControlContent,
}

enum ControlContent {
    PeerChange(ContentPeerChange),
    ScheduleComponent(CompId),
    BlockedPort(PortId),
    ClosedPort(PortId),
}

struct ContentPeerChange {
    source_port: PortId,
    source_comp: CompId,
    old_target_port: PortId,
    new_target_port: PortId,
    new_target_comp: CompId,
    schedule_entry_id: ControlId,
}

pub(crate) enum AckAction {
    None,
    SendMessageAndAck(CompId, ControlMessage, ControlId),
    ScheduleComponent(CompId),
}

/// Handling/sending control messages.
pub(crate) struct ControlLayer {
    id_counter: ControlId,
    entries: Vec<ControlEntry>,
}

impl ControlLayer {
    pub(crate) fn should_reroute(&self, message: &mut Message) -> Option<CompId> {
        // Safety note: rerouting should occur during the time when we're
        // notifying a peer of a new component. During this period that
        // component hasn't been executed yet, so cannot have died yet.
        // FIX @NoDirectHandle
        let target_port = message.target_port();
        if target_port.is_none() {
            return None;
        }

        let target_port = target_port.unwrap();
        for entry in &self.entries {
            if let ControlContent::PeerChange(entry) = &entry.content {
                if entry.old_target_port == target_port {
                    message.modify_target_port(entry.new_target_port);
                    return Some(entry.new_target_comp);
                }
            }
        }

        return None;
    }

    pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> AckAction {
        let entry_index = self.get_entry_index_by_id(entry_id).unwrap();
        let entry = &mut self.entries[entry_index];
        debug_assert!(entry.ack_countdown > 0);

        entry.ack_countdown -= 1;
        if entry.ack_countdown != 0 {
            return AckAction::None;
        }

        // All `Ack`s received, take action based on the kind of entry
        match &entry.content {
            ControlContent::PeerChange(content) => {
                // If change of peer is ack'd. Then we are certain we have
                // rerouted all of the messages, and the sender's port can now
                // be unblocked again.
                let target_comp_id = content.source_comp;
                let message_to_send = ControlMessage{
                    id: ControlId::new_invalid(),
                    sender_comp_id: comp_ctx.id,
                    target_port_id: Some(content.source_port),
                    content: ControlMessageContent::PortPeerChangedUnblock(
                        content.new_target_port,
                        content.new_target_comp
                    )
                };
                let to_ack = content.schedule_entry_id;
                self.entries.remove(entry_index);

                return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack);
            },
            ControlContent::ScheduleComponent(to_schedule) => {
                // If all change-of-peers are `Ack`d, then we're ready to
                // schedule the component!
                return AckAction::ScheduleComponent(*to_schedule);
            },
            ControlContent::BlockedPort(_) => unreachable!(),
            ControlContent::ClosedPort(port_id) => {
                // If a closed port is Ack'd, then we remove the reference to
                // that component.
                let port_handle = comp_ctx.get_port_handle(*port_id);
                let port_info = comp_ctx.get_port(port_handle);
                debug_assert_eq!(port_info.state, PortState::Closed);
                comp_ctx.remove_peer(sched_ctx, port_handle, port_info.peer_comp_id);

                return AckAction::None;
            }
        }
    }

    // -------------------------------------------------------------------------
    // Port transfer (due to component creation)
    // -------------------------------------------------------------------------

    /// Adds an entry that, when completely ack'd, will schedule a component.
    pub(crate) fn add_schedule_entry(&mut self, to_schedule_id: CompId) -> ControlId {
        let entry_id = self.take_id();
        self.entries.push(ControlEntry{
            id: entry_id,
            ack_countdown: 0, // incremented by calls to `add_reroute_entry`
            content: ControlContent::ScheduleComponent(to_schedule_id),
        });

        return entry_id;
    }

    /// Removes a schedule entry. Only used if the caller preemptively called
    /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`,
    /// hence the `ack_countdown` in the scheduling entry is at 0.
    pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) {
        let index = self.get_entry_index_by_id(schedule_entry_id).unwrap();
        debug_assert_eq!(self.entries[index].ack_countdown, 0);
        self.entries.remove(index);
    }

    pub(crate) fn add_reroute_entry(
        &mut self, creator_comp_id: CompId,
        source_port_id: PortId, source_comp_id: CompId,
        old_target_port_id: PortId, new_target_port_id: PortId, new_comp_id: CompId,
        schedule_entry_id: ControlId,
    ) -> Message {
        let entry_id = self.take_id();
        self.entries.push(ControlEntry{
            id: entry_id,
            ack_countdown: 1,
            content: ControlContent::PeerChange(ContentPeerChange{
                source_port: source_port_id,
                source_comp: source_comp_id,
                old_target_port: old_target_port_id,
                new_target_port: new_target_port_id,
                new_target_comp: new_comp_id,
                schedule_entry_id,
            }),
        });

        // increment counter on schedule entry
        for entry in &mut self.entries {
            if entry.id == schedule_entry_id {
                entry.ack_countdown += 1;
                break;
            }
        }

        return Message::Control(ControlMessage{
            id: entry_id,
            sender_comp_id: creator_comp_id,
            target_port_id: Some(source_port_id),
            content: ControlMessageContent::PortPeerChangedBlock(source_port_id)
        })
    }

    // -------------------------------------------------------------------------
    // Blocking, unblocking, and closing ports
    // -------------------------------------------------------------------------

    pub(crate) fn initiate_port_closing(&mut self, port_handle: PortHandle, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> {
        let port = comp_ctx.get_port_mut(port_handle);
        let port_id = port.self_id;
        let peer_port_id = port.peer_port_id;
        let peer_comp_id = port.peer_comp_id;
        debug_assert!(port.state == PortState::Open || port.state == PortState::Blocked);

        port.state = PortState::Closed;

        if peer_comp_id == comp_ctx.id {
            // We own the other end of the channel as well.
            return None;
        }

        let entry_id = self.take_id();
        self.entries.push(ControlEntry{
            id: entry_id,
            ack_countdown: 1,
            content: ControlContent::ClosedPort(port_id),
        });

        return Some((
            peer_comp_id,
            ControlMessage{
                id: entry_id,
                sender_comp_id: comp_ctx.id,
                target_port_id: Some(peer_port_id),
                content: ControlMessageContent::ClosePort(peer_port_id),
            }
        ));
    }

    /// Adds a control entry to track that a port is blocked. Expects the caller
    /// to have set the port's state to blocking already. The returned tuple
    /// contains a message and the peer to send it to.
    pub(crate) fn initiate_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) {
        let port_info = comp_ctx.get_port(port_handle);
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're telling the putter to block
        debug_assert_eq!(port_info.state, PortState::Blocked); // contract with caller

        let peer_port_id = port_info.peer_port_id;
        let peer_comp_id = port_info.peer_comp_id;
        let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);

        let entry_id = self.take_id();
        self.entries.push(ControlEntry{
            id: entry_id,
            ack_countdown: 0,
            content: ControlContent::BlockedPort(port_info.self_id),
        });

        return (
            peer_handle,
            ControlMessage{
                id: entry_id,
                sender_comp_id: comp_ctx.id,
                target_port_id: Some(port_info.peer_port_id),
                content: ControlMessageContent::BlockPort(peer_port_id),
            }
        );
    }

    /// Removes the control entry that tracks that a port is blocked. Expects
    /// the caller to have already marked the port as unblocked. Again the
    /// returned tuple contains a message and the target it is intended for
    pub(crate) fn cancel_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) {
        let port_info = comp_ctx.get_port(port_handle);
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're initiating the unblocking
        debug_assert_eq!(port_info.state, PortState::Open); // contract with caller, the locally stored entry ensures we were blocked before

        let position = self.entries.iter()
            .position(|v| {
                if let ControlContent::BlockedPort(blocked_port_id) = &v.content {
                    if *blocked_port_id == port_info.self_id {
                        return true;
                    }
                }
                return false;
            })
            .unwrap();

        let entry = self.entries.remove(position);
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);

        return (
            peer_handle,
            ControlMessage{
                id: entry.id,
                sender_comp_id: comp_ctx.id,
                target_port_id: Some(port_info.peer_port_id),
                content: ControlMessageContent::UnblockPort(port_info.peer_port_id)
            }
        );
    }

    // -------------------------------------------------------------------------
    // Internal utilities
    // -------------------------------------------------------------------------

    fn take_id(&mut self) -> ControlId {
        let id = self.id_counter;
        self.id_counter.0 = self.id_counter.0.wrapping_add(1);
        return id;
    }

    fn get_entry_index_by_id(&self, entry_id: ControlId) -> Option<usize> {
        for (index, entry) in self.entries.iter().enumerate() {
            if entry.id == entry_id {
                return Some(index);
            }
        }

        return None;
    }
}

impl Default for ControlLayer {
    fn default() -> Self {
        return ControlLayer{
            id_counter: ControlId(0),
            entries: Vec::new(),
        }
    }
}