Files @ 6555f56a22a9
Branch filter:

Location: CSY/reowolf/src/runtime2/component/control_layer.rs - annotation

6555f56a22a9 11.3 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
mh
WIP: First sync test, partially correct
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
9e771c9cf8d3
968e958c3286
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
968e958c3286
0de39654770f
0de39654770f
0de39654770f
9e771c9cf8d3
9e771c9cf8d3
968e958c3286
9e771c9cf8d3
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
968e958c3286
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
0de39654770f
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
9e771c9cf8d3
968e958c3286
0de39654770f
9e771c9cf8d3
968e958c3286
968e958c3286
c04f7fea1a62
968e958c3286
968e958c3286
c04f7fea1a62
968e958c3286
968e958c3286
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
c04f7fea1a62
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
0de39654770f
968e958c3286
968e958c3286
0de39654770f
0de39654770f
c04f7fea1a62
c04f7fea1a62
0de39654770f
0de39654770f
0de39654770f
0de39654770f
c04f7fea1a62
6555f56a22a9
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
c04f7fea1a62
0de39654770f
0de39654770f
0de39654770f
c04f7fea1a62
c04f7fea1a62
0de39654770f
0de39654770f
0de39654770f
0de39654770f
d06da4e9296c
968e958c3286
968e958c3286
968e958c3286
968e958c3286
c04f7fea1a62
c04f7fea1a62
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
c04f7fea1a62
968e958c3286
968e958c3286
968e958c3286
c04f7fea1a62
968e958c3286
968e958c3286
968e958c3286
c04f7fea1a62
c04f7fea1a62
968e958c3286
968e958c3286
968e958c3286
968e958c3286
d06da4e9296c
968e958c3286
968e958c3286
c04f7fea1a62
968e958c3286
c04f7fea1a62
c04f7fea1a62
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
c04f7fea1a62
c04f7fea1a62
968e958c3286
d06da4e9296c
968e958c3286
968e958c3286
968e958c3286
c04f7fea1a62
968e958c3286
968e958c3286
968e958c3286
c04f7fea1a62
c04f7fea1a62
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
9e771c9cf8d3
968e958c3286
9e771c9cf8d3
9e771c9cf8d3
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
9e771c9cf8d3
use crate::runtime2::runtime::*;
use crate::runtime2::communication::*;
use crate::runtime2::component::*;

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(crate) struct ControlId(u32);

impl ControlId {
    /// Like other invalid IDs, this one doesn't care any significance, but is
    /// just set at u32::MAX to hopefully bring out bugs sooner.
    fn new_invalid() -> Self {
        return ControlId(u32::MAX);
    }
}

struct ControlEntry {
    id: ControlId,
    ack_countdown: u32,
    content: ControlContent,
}

enum ControlContent {
    PeerChange(ContentPeerChange),
    ScheduleComponent(CompId),
    BlockedPort(PortId),
    ClosedPort(PortId),
}

struct ContentPeerChange {
    source_port: PortId,
    source_comp: CompId,
    target_port: PortId,
    new_target_comp: CompId,
    schedule_entry_id: ControlId,
}

pub(crate) enum AckAction {
    None,
    SendMessageAndAck(CompId, ControlMessage, ControlId),
    ScheduleComponent(CompId),
}

/// Handling/sending control messages.
pub(crate) struct ControlLayer {
    id_counter: ControlId,
    entries: Vec<ControlEntry>,
}

impl ControlLayer {
    pub(crate) fn should_reroute(&self, message: &Message) -> Option<CompId> {
        // Safety note: rerouting should occur during the time when we're
        // notifying a peer of a new component. During this period that
        // component hasn't been executed yet, so cannot have died yet.
        // FIX @NoDirectHandle
        let target_port = message.target_port();
        if target_port.is_none() {
            return None;
        }

        let target_port = target_port.unwrap();
        for entry in &self.entries {
            if let ControlContent::PeerChange(entry) = &entry.content {
                if entry.target_port == target_port {
                    return Some(entry.new_target_comp);
                }
            }
        }

        return None;
    }

    pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> AckAction {
        let entry_index = self.get_entry_index(entry_id).unwrap();
        let entry = &mut self.entries[entry_index];
        debug_assert!(entry.ack_countdown > 0);

        entry.ack_countdown -= 1;
        if entry.ack_countdown != 0 {
            return AckAction::None;
        }

        // All `Ack`s received, take action based on the kind of entry
        match &entry.content {
            ControlContent::PeerChange(content) => {
                // If change of peer is ack'd. Then we are certain we have
                // rerouted all of the messages, and the sender's port can now
                // be unblocked again.
                let target_comp_id = content.source_comp;
                let message_to_send = ControlMessage{
                    id: ControlId::new_invalid(),
                    sender_comp_id: comp_ctx.id,
                    target_port_id: Some(content.source_port),
                    content: ControlMessageContent::PortPeerChangedUnblock(
                        content.source_port,
                        content.new_target_comp
                    )
                };
                let to_ack = content.schedule_entry_id;

                self.entries.remove(entry_index);
                self.handle_ack(to_ack, sched_ctx, comp_ctx);

                return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack);
            },
            ControlContent::ScheduleComponent(to_schedule) => {
                // If all change-of-peers are `Ack`d, then we're ready to
                // schedule the component!
                return AckAction::ScheduleComponent(*to_schedule);
            },
            ControlContent::BlockedPort(_) => unreachable!(),
            ControlContent::ClosedPort(port_id) => {
                // If a closed port is Ack'd, then we remove the reference to
                // that component.
                let port_index = comp_ctx.get_port_index(*port_id).unwrap();
                debug_assert_eq!(comp_ctx.ports[port_index].state, PortState::Blocked);
                let peer_id = comp_ctx.ports[port_index].peer_comp_id;
                let peer_index = comp_ctx.get_peer_index(peer_id).unwrap();
                let peer_info = &mut comp_ctx.peers[peer_index];
                peer_info.num_associated_ports -= 1;

                if peer_info.num_associated_ports == 0 {
                    let should_remove = peer_info.handle.decrement_users();
                    if should_remove {
                        let comp_key = unsafe{ peer_info.id.upgrade() };
                        sched_ctx.runtime.destroy_component(comp_key);
                    }

                    comp_ctx.peers.remove(peer_index);
                }

                return AckAction::None;
            }
        }
    }

    // -------------------------------------------------------------------------
    // Port transfer (due to component creation)
    // -------------------------------------------------------------------------

    /// Adds an entry that, when completely ack'd, will schedule a component.
    pub(crate) fn add_schedule_entry(&mut self, to_schedule_id: CompId) -> ControlId {
        let entry_id = self.take_id();
        self.entries.push(ControlEntry{
            id: entry_id,
            ack_countdown: 0, // incremented by calls to `add_reroute_entry`
            content: ControlContent::ScheduleComponent(to_schedule_id),
        });

        return entry_id;
    }

    /// Removes a schedule entry. Only used if the caller preemptively called
    /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`,
    /// hence the `ack_countdown` in the scheduling entry is at 0.
    pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) {
        let index = self.get_entry_index(schedule_entry_id).unwrap();
        debug_assert_eq!(self.entries[index].ack_countdown, 0);
        self.entries.remove(index);
    }

    pub(crate) fn add_reroute_entry(
        &mut self, creator_comp_id: CompId,
        source_port_id: PortId, source_comp_id: CompId,
        target_port_id: PortId, new_comp_id: CompId,
        schedule_entry_id: ControlId,
    ) -> Message {
        let entry_id = self.take_id();
        self.entries.push(ControlEntry{
            id: entry_id,
            ack_countdown: 1,
            content: ControlContent::PeerChange(ContentPeerChange{
                source_port: source_port_id,
                source_comp: source_comp_id,
                target_port: target_port_id,
                new_target_comp: new_comp_id,
                schedule_entry_id,
            }),
        });

        // increment counter on schedule entry
        for entry in &mut self.entries {
            if entry.id == schedule_entry_id {
                entry.ack_countdown += 1;
                break;
            }
        }

        return Message::Control(ControlMessage{
            id: entry_id,
            sender_comp_id: creator_comp_id,
            target_port_id: Some(source_port_id),
            content: ControlMessageContent::PortPeerChangedBlock(source_port_id)
        })
    }

    // -------------------------------------------------------------------------
    // Blocking, unblocking, and closing ports
    // -------------------------------------------------------------------------

    pub(crate) fn mark_port_closed<'a>(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> {
        let port = comp_ctx.get_port_mut(port_id);
        let peer_port_id = port.peer_id;
        let peer_comp_id = port.peer_comp_id;
        debug_assert!(port.state == PortState::Open || port.state == PortState::Blocked);

        port.state = PortState::Closed;

        if peer_comp_id == comp_ctx.id {
            // We own the other end of the channel as well.
            return None;
        }

        let entry_id = self.take_id();
        self.entries.push(ControlEntry{
            id: entry_id,
            ack_countdown: 1,
            content: ControlContent::ClosedPort(port_id),
        });

        return Some((
            peer_comp_id,
            ControlMessage{
                id: entry_id,
                sender_comp_id: comp_ctx.id,
                target_port_id: Some(peer_port_id),
                content: ControlMessageContent::ClosePort(peer_port_id),
            }
        ));
    }

    pub(crate) fn set_port_and_peer_blocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) {
        // TODO: Feels like this shouldn't be an entry. Hence this class should
        //  be renamed. Lets see where the code ends up being
        let entry_id = self.take_id();
        let port_info = comp_ctx.get_port_mut(port_id);
        let peer_port_id = port_info.peer_id;
        let peer_comp_id = port_info.peer_comp_id;
        debug_assert_eq!(port_info.state, PortState::Open); // prevent unforeseen issues
        port_info.state = PortState::Blocked;

        self.entries.push(ControlEntry{
            id: entry_id,
            ack_countdown: 0,
            content: ControlContent::BlockedPort(port_id),
        });

        return (
            peer_comp_id,
            ControlMessage{
                id: entry_id,
                sender_comp_id: comp_ctx.id,
                target_port_id: Some(peer_port_id),
                content: ControlMessageContent::BlockPort(peer_port_id),
            }
        );
    }

    pub(crate) fn set_port_and_peer_unblocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) {
        // Find the entry that contains the blocking entry for the port
        let mut entry_index = usize::MAX;
        let mut entry_id = ControlId::new_invalid();
        for (index, entry) in self.entries.iter().enumerate() {
            if let ControlContent::BlockedPort(blocked_port) = &entry.content {
                if *blocked_port == port_id {
                    entry_index = index;
                    entry_id = entry.id;
                    break;
                }
            }
        }

        let port_info = comp_ctx.get_port_mut(port_id);
        let peer_port_id = port_info.peer_id;
        let peer_comp_id = port_info.peer_comp_id;
        debug_assert_eq!(port_info.state, PortState::Blocked);
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we blocked it because of receiving too many messages
        port_info.state = PortState::Open;

        return (
            peer_comp_id,
            ControlMessage{
                id: entry_id,
                sender_comp_id: comp_ctx.id,
                target_port_id: Some(peer_port_id),
                content: ControlMessageContent::UnblockPort(peer_port_id),
            }
        )
    }

    // -------------------------------------------------------------------------
    // Internal utilities
    // -------------------------------------------------------------------------

    fn take_id(&mut self) -> ControlId {
        let id = self.id_counter;
        self.id_counter.0 = self.id_counter.0.wrapping_add(1);
        return id;
    }

    fn get_entry_index(&self, entry_id: ControlId) -> Option<usize> {
        for (index, entry) in self.entries.iter().enumerate() {
            if entry.id == entry_id {
                return Some(index);
            }
        }

        return None;
    }
}

impl Default for ControlLayer {
    fn default() -> Self {
        return ControlLayer{
            id_counter: ControlId(0),
            entries: Vec::new(),
        }
    }
}