Files @ d06da4e9296c
Branch filter:

Location: CSY/reowolf/src/runtime2/component/control_layer.rs

d06da4e9296c 11.3 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
mh
WIP: Reimplementing messaging and consensus
use crate::runtime2::runtime::*;
use crate::runtime2::communication::*;
use crate::runtime2::component::*;

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(crate) struct ControlId(u32);

impl ControlId {
    /// Like other invalid IDs, this one doesn't care any significance, but is
    /// just set at u32::MAX to hopefully bring out bugs sooner.
    fn new_invalid() -> Self {
        return ControlId(u32::MAX);
    }
}

struct ControlEntry {
    id: ControlId,
    ack_countdown: u32,
    content: ControlContent,
}

enum ControlContent {
    PeerChange(ContentPeerChange),
    ScheduleComponent(CompId),
    BlockedPort(PortId),
    ClosedPort(PortId),
}

struct ContentPeerChange {
    source_port: PortId,
    source_comp: CompId,
    target_port: PortId,
    new_target_comp: CompId,
    schedule_entry_id: ControlId,
}

pub(crate) enum AckAction {
    None,
    SendMessageAndAck(CompId, ControlMessage, ControlId),
    ScheduleComponent(CompId),
}

/// Handling/sending control messages.
pub(crate) struct ControlLayer {
    id_counter: ControlId,
    entries: Vec<ControlEntry>,
}

impl ControlLayer {
    pub(crate) fn should_reroute(&self, message: &Message) -> Option<CompId> {
        // Safety note: rerouting should occur during the time when we're
        // notifying a peer of a new component. During this period that
        // component hasn't been executed yet, so cannot have died yet.
        // FIX @NoDirectHandle
        let target_port = message.target_port();
        if target_port.is_none() {
            return None;
        }

        let target_port = target_port.unwrap();
        for entry in &self.entries {
            if let ControlContent::PeerChange(entry) = &entry.content {
                if entry.target_port == target_port {
                    return Some(entry.new_target_comp);
                }
            }
        }

        return None;
    }

    pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> AckAction {
        let entry_index = self.get_entry_index(entry_id).unwrap();
        let entry = &mut self.entries[entry_index];
        debug_assert!(entry.ack_countdown > 0);

        entry.ack_countdown -= 1;
        if entry.ack_countdown != 0 {
            return AckAction::None;
        }

        // All `Ack`s received, take action based on the kind of entry
        match &entry.content {
            ControlContent::PeerChange(content) => {
                // If change of peer is ack'd. Then we are certain we have
                // rerouted all of the messages, and the sender's port can now
                // be unblocked again.
                let target_comp_id = content.source_comp;
                let message_to_send = ControlMessage{
                    id: ControlId::new_invalid(),
                    sender_comp_id: comp_ctx.id,
                    target_port_id: Some(content.source_port),
                    content: ControlMessageContent::PortPeerChangedUnblock(
                        content.source_port,
                        content.new_target_comp
                    )
                };
                let to_ack = content.schedule_entry_id;

                self.entries.remove(entry_index);
                self.handle_ack(to_ack, sched_ctx, comp_ctx);

                return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack);
            },
            ControlContent::ScheduleComponent(to_schedule) => {
                // If all change-of-peers are `Ack`d, then we're ready to
                // schedule the component!
                return AckAction::ScheduleComponent(*to_schedule);
            },
            ControlContent::BlockedPort(_) => unreachable!(),
            ControlContent::ClosedPort(port_id) => {
                // If a closed port is Ack'd, then we remove the reference to
                // that component.
                let port_index = comp_ctx.get_port_index(*port_id).unwrap();
                debug_assert_eq!(comp_ctx.ports[port_index].state, PortState::Blocked);
                let peer_id = comp_ctx.ports[port_index].peer_comp_id;
                let peer_index = comp_ctx.get_peer_index(peer_id).unwrap();
                let peer_info = &mut comp_ctx.peers[peer_index];
                peer_info.num_associated_ports -= 1;

                if peer_info.num_associated_ports == 0 {
                    let should_remove = peer_info.handle.decrement_users();
                    if should_remove {
                        let comp_key = unsafe{ peer_info.id.upgrade() };
                        sched_ctx.runtime.destroy_component(comp_key);
                    }

                    comp_ctx.peers.remove(peer_index);
                }

                return AckAction::None;
            }
        }
    }

    // -------------------------------------------------------------------------
    // Port transfer (due to component creation)
    // -------------------------------------------------------------------------

    /// Adds an entry that, when completely ack'd, will schedule a component.
    pub(crate) fn add_schedule_entry(&mut self, to_schedule_id: CompId) -> ControlId {
        let entry_id = self.take_id();
        self.entries.push(ControlEntry{
            id: entry_id,
            ack_countdown: 0, // incremented by calls to `add_reroute_entry`
            content: ControlContent::ScheduleComponent(to_schedule_id),
        });

        return entry_id;
    }

    /// Removes a schedule entry. Only used if the caller preemptively called
    /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`,
    /// hence the `ack_countdown` in the scheduling entry is at 0.
    pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) {
        let index = self.get_entry_index(schedule_entry_id).unwrap();
        debug_assert_eq!(self.entries[index].ack_countdown, 0);
        self.entries.remove(index);
    }

    pub(crate) fn add_reroute_entry(
        &mut self, creator_comp_id: CompId,
        source_port_id: PortId, source_comp_id: CompId,
        target_port_id: PortId, new_comp_id: CompId,
        schedule_entry_id: ControlId,
    ) -> Message {
        let entry_id = self.take_id();
        self.entries.push(ControlEntry{
            id: entry_id,
            ack_countdown: 1,
            content: ControlContent::PeerChange(ContentPeerChange{
                source_port: source_port_id,
                source_comp: source_comp_id,
                target_port: target_port_id,
                new_target_comp: new_comp_id,
                schedule_entry_id,
            }),
        });

        // increment counter on schedule entry
        for entry in &mut self.entries {
            if entry.id == schedule_entry_id {
                entry.ack_countdown += 1;
                break;
            }
        }

        return Message::Control(ControlMessage{
            id: entry_id,
            sender_comp_id: creator_comp_id,
            target_port_id: Some(source_port_id),
            content: ControlMessageContent::PortPeerChangedBlock(source_port_id)
        })
    }

    // -------------------------------------------------------------------------
    // Blocking, unblocking, and closing ports
    // -------------------------------------------------------------------------

    pub(crate) fn mark_port_closed<'a>(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> {
        let port = comp_ctx.get_port_mut(port_id);
        let peer_port_id = port.peer_id;
        let peer_comp_id = port.peer_comp_id;
        debug_assert!(port.state == PortState::Open || port.state == PortState::Blocked);

        port.state = PortState::Closed;

        if peer_comp_id == comp_ctx.id {
            // We own the other end of the channel as well
            return None;
        }

        let entry_id = self.take_id();
        self.entries.push(ControlEntry{
            id: entry_id,
            ack_countdown: 1,
            content: ControlContent::ClosedPort(port_id),
        });

        return Some((
            peer_comp_id,
            ControlMessage{
                id: entry_id,
                sender_comp_id: comp_ctx.id,
                target_port_id: Some(peer_port_id),
                content: ControlMessageContent::ClosePort(peer_port_id),
            }
        ));
    }

    pub(crate) fn set_port_and_peer_blocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) {
        // TODO: Feels like this shouldn't be an entry. Hence this class should
        //  be renamed. Lets see where the code ends up being
        let entry_id = self.take_id();
        let port_info = comp_ctx.get_port_mut(port_id);
        let peer_port_id = port_info.peer_id;
        let peer_comp_id = port_info.peer_comp_id;
        debug_assert_eq!(port_info.state, PortState::Open); // prevent unforeseen issues
        port_info.state = PortState::Blocked;

        self.entries.push(ControlEntry{
            id: entry_id,
            ack_countdown: 0,
            content: ControlContent::BlockedPort(port_id),
        });

        return (
            peer_comp_id,
            ControlMessage{
                id: entry_id,
                sender_comp_id: comp_ctx.id,
                target_port_id: Some(peer_port_id),
                content: ControlMessageContent::BlockPort(peer_port_id),
            }
        );
    }

    pub(crate) fn set_port_and_peer_unblocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) {
        // Find the entry that contains the blocking entry for the port
        let mut entry_index = usize::MAX;
        let mut entry_id = ControlId::new_invalid();
        for (index, entry) in self.entries.iter().enumerate() {
            if let ControlContent::BlockedPort(blocked_port) = &entry.content {
                if *blocked_port == port_id {
                    entry_index = index;
                    entry_id = entry.id;
                    break;
                }
            }
        }

        let port_info = comp_ctx.get_port_mut(port_id);
        let peer_port_id = port_info.peer_id;
        let peer_comp_id = port_info.peer_comp_id;
        debug_assert_eq!(port_info.state, PortState::Blocked);
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we blocked it because of receiving too many messages
        port_info.state = PortState::Open;

        return (
            peer_comp_id,
            ControlMessage{
                id: entry_id,
                sender_comp_id: comp_ctx.id,
                target_port_id: Some(peer_port_id),
                content: ControlMessageContent::UnblockPort(peer_port_id),
            }
        )
    }

    // -------------------------------------------------------------------------
    // Internal utilities
    // -------------------------------------------------------------------------

    fn take_id(&mut self) -> ControlId {
        let id = self.id_counter;
        self.id_counter.0 = self.id_counter.0.wrapping_add(1);
        return id;
    }

    fn get_entry_index(&self, entry_id: ControlId) -> Option<usize> {
        for (index, entry) in self.entries.iter().enumerate() {
            if entry.id == entry_id {
                return Some(index);
            }
        }

        return None;
    }
}

impl Default for ControlLayer {
    fn default() -> Self {
        return ControlLayer{
            id_counter: ControlId(0),
            entries: Vec::new(),
        }
    }
}