Files @ e62f669c1e03
Branch filter:

Location: CSY/reowolf/src/runtime2/component/component_context.rs

e62f669c1e03 11.1 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
mh
WIP on refactoring port transmission code
use std::fmt::{Debug, Formatter, Result as FmtResult};

use crate::runtime2::scheduler::*;
use crate::runtime2::runtime::*;
use crate::runtime2::communication::*;

use crate::protocol::ExpressionId;

/// Helper struct to remember when the last operation on the port took place.
#[derive(Debug, PartialEq, Copy, Clone)]
pub enum PortInstruction {
    None,
    NoSource,
    SourceLocation(ExpressionId),
}

impl PortInstruction {
    pub fn is_none(&self) -> bool {
        match self {
            PortInstruction::None => return true,
            _ => return false,
        }
    }
}

/// Directionality of a port
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum PortKind {
    Putter,
    Getter,
}

/// Bitflags for port
// TODO: Incorporate remaining flags from `Port` struct
#[repr(u32)]
#[derive(Debug, Copy, Clone)]
pub enum PortStateFlag {
    Closed = 0x01, // If not closed, then the port is open
    BlockedDueToPeerChange = 0x02, // busy changing peers, hence use of port is temporarily blocked
    BlockedDueToFullBuffers = 0x04,
    Transmitted = 0x08, // Transmitted, so cannot be used anymore
}

#[derive(Copy, Clone)]
pub struct PortState {
    flags: u32
}

impl PortState {
    pub(crate) fn new() -> PortState {
        return PortState{ flags: 0 }
    }

    // high-level

    #[inline]
    pub fn is_open(&self) -> bool {
        return !self.is_closed();
    }

    #[inline]
    pub fn can_send(&self) -> bool {
        return
            !self.is_set(PortStateFlag::Closed) &&
            !self.is_set(PortStateFlag::Transmitted);
    }

    #[inline]
    pub fn is_closed(&self) -> bool {
        return self.is_set(PortStateFlag::Closed);
    }

    #[inline]
    pub fn is_blocked(&self) -> bool {
        return
            self.is_set(PortStateFlag::BlockedDueToPeerChange) ||
            self.is_set(PortStateFlag::BlockedDueToFullBuffers);
    }

    // lower-level utils
    #[inline]
    pub fn set(&mut self, flag: PortStateFlag) {
        self.flags |= flag as u32;
    }

    #[inline]
    pub fn clear(&mut self, flag: PortStateFlag) {
        self.flags &= !(flag as u32);
    }

    #[inline]
    pub const fn is_set(&self, flag: PortStateFlag) -> bool {
        return (self.flags & (flag as u32)) != 0;
    }
}

impl Debug for PortState {
    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
        use PortStateFlag::*;

        let mut s = f.debug_struct("PortState");
        for (flag_name, flag_value) in &[
            ("closed", Closed),
            ("blocked_peer_change", BlockedDueToPeerChange),
            ("blocked_full_buffers", BlockedDueToFullBuffers)
        ] {
            s.field(flag_name, &self.is_set(*flag_value));
        }

        return s.finish();
    }
}

#[derive(Debug)]
pub struct Port {
    // Identifiers
    pub self_id: PortId,
    pub peer_comp_id: CompId, // eventually consistent
    pub peer_port_id: PortId, // eventually consistent
    // Generic operating state
    pub kind: PortKind,
    pub state: PortState,
    // State tracking for error detection and error handling
    pub last_instruction: PortInstruction, // used during sync round to detect port-closed-during-sync errors
    pub received_message_for_sync: bool, // used during sync round to detect port-closed-before-sync errors
    pub close_at_sync_end: bool, // set during sync round when receiving a port-closed-after-sync message
    pub(crate) associated_with_peer: bool,
}

pub struct Peer {
    pub id: CompId,
    pub num_associated_ports: u32,
    pub(crate) handle: CompHandle,
}

/// Port and peer management structure. Will keep a local reference counter to
/// the ports associate with peers, additionally manages the atomic reference
/// counter associated with the peers' component handles.
pub struct CompCtx {
    pub id: CompId,
    ports: Vec<Port>,
    peers: Vec<Peer>,
    port_id_counter: u32,
}

#[derive(Copy, Clone, PartialEq, Eq)]
pub struct LocalPortHandle(PortId);

#[derive(Copy, Clone)]
pub struct LocalPeerHandle(CompId);

impl CompCtx {
    /// Creates a new component context based on a reserved entry in the
    /// component store. This reservation is used such that we already know our
    /// assigned ID.
    pub(crate) fn new(reservation: &CompReserved) -> Self {
        return Self{
            id: reservation.id(),
            ports: Vec::new(),
            peers: Vec::new(),
            port_id_counter: 0,
        }
    }

    /// Creates a new channel that is fully owned by the component associated
    /// with this context.
    pub(crate) fn create_channel(&mut self) -> Channel {
        let putter_id = PortId(self.take_port_id());
        let getter_id = PortId(self.take_port_id());
        self.ports.push(Port{
            self_id: putter_id,
            peer_port_id: getter_id,
            kind: PortKind::Putter,
            state: PortState::new(),
            peer_comp_id: self.id,
            last_instruction: PortInstruction::None,
            close_at_sync_end: false,
            received_message_for_sync: false,
            associated_with_peer: false,
        });
        self.ports.push(Port{
            self_id: getter_id,
            peer_port_id: putter_id,
            kind: PortKind::Getter,
            state: PortState::new(),
            peer_comp_id: self.id,
            last_instruction: PortInstruction::None,
            close_at_sync_end: false,
            received_message_for_sync: false,
            associated_with_peer: false,
        });

        return Channel{ putter_id, getter_id };
    }

    /// Adds a new port. Make sure to call `add_peer` afterwards.
    pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle {
        let self_id = PortId(self.take_port_id());
        self.ports.push(Port{
            self_id, peer_comp_id, peer_port_id, kind, state,
            last_instruction: PortInstruction::None,
            close_at_sync_end: false,
            received_message_for_sync: false,
            associated_with_peer: false,
        });
        return LocalPortHandle(self_id);
    }

    /// Removes a port. Make sure you called `remove_peer` first.
    pub(crate) fn remove_port(&mut self, port_handle: LocalPortHandle) -> Port {
        let port_index = self.must_get_port_index(port_handle);
        let port = self.ports.remove(port_index);
        dbg_code!(assert!(!port.associated_with_peer));
        return port;
    }

    /// Changes a peer
    pub(crate) fn change_port_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, new_peer_comp_id: Option<CompId>) {
        // If port is currently associated with a peer, then remove that peer
        let port_index = self.get_port_index(port_handle);
        let port = &mut self.ports[port_index];
        let port_is_closed = port.state.is_closed();
        if port.associated_with_peer {
            // Remove old peer association
            port.associated_with_peer = false;
            let peer_comp_id = port.peer_comp_id;
            let peer_index = self.get_peer_index_by_id(peer_comp_id).unwrap();
            let peer = &mut self.peers[peer_index];

            peer.num_associated_ports -= 1;
            if peer.num_associated_ports == 0 {
                let mut peer = self.peers.remove(peer_index);
                if let Some(key) = peer.handle.decrement_users() {
                    sched_ctx.runtime.destroy_component(key);
                }
            }
        }

        // If there is a new peer, then set it as the peer associated with the
        // port
        if let Some(peer_id) = new_peer_comp_id {
            let port = &mut self.ports[port_index];
            port.peer_comp_id = peer_id;

            if peer_id != self.id && !port_is_closed {
                port.associated_with_peer = true;

                match self.get_peer_index_by_id(peer_id) {
                    Some(index) => {
                        let peer = &mut self.peers[index];
                        peer.num_associated_ports += 1;
                    },
                    None => {
                        let handle = sched_ctx.runtime.get_component_public(peer_id);
                        self.peers.push(Peer {
                            id: peer_id,
                            num_associated_ports: 1,
                            handle
                        })
                    }
                }
            }
        }
    }

    pub(crate) fn get_port_handle(&self, port_id: PortId) -> LocalPortHandle {
        return LocalPortHandle(port_id);
    }

    // should perhaps be revised, used in main inbox
    pub(crate) fn get_port_index(&self, port_handle: LocalPortHandle) -> usize {
        return self.must_get_port_index(port_handle);
    }

    pub(crate) fn get_peer_handle(&self, peer_id: CompId) -> LocalPeerHandle {
        return LocalPeerHandle(peer_id);
    }

    pub(crate) fn get_port(&self, port_handle: LocalPortHandle) -> &Port {
        let index = self.must_get_port_index(port_handle);
        return &self.ports[index];
    }

    pub(crate) fn get_port_mut(&mut self, port_handle: LocalPortHandle) -> &mut Port {
        let index = self.must_get_port_index(port_handle);
        return &mut self.ports[index];
    }

    pub(crate) fn get_port_by_index_mut(&mut self, index: usize) -> &mut Port {
        return &mut self.ports[index];
    }

    pub(crate) fn get_peer(&self, peer_handle: LocalPeerHandle) -> &Peer {
        let index = self.must_get_peer_index(peer_handle);
        return &self.peers[index];
    }

    pub(crate) fn get_peer_mut(&mut self, peer_handle: LocalPeerHandle) -> &mut Peer {
        let index = self.must_get_peer_index(peer_handle);
        return &mut self.peers[index];
    }

    #[inline]
    pub(crate) fn iter_ports(&self) -> impl Iterator<Item=&Port> {
        return self.ports.iter();
    }

    #[inline]
    pub(crate) fn iter_ports_mut(&mut self) -> impl Iterator<Item=&mut Port> {
        return self.ports.iter_mut();
    }

    #[inline]
    pub(crate) fn iter_peers(&self) -> impl Iterator<Item=&Peer> {
        return self.peers.iter();
    }

    #[inline]
    pub(crate) fn num_ports(&self) -> usize {
        return self.ports.len();
    }

    // -------------------------------------------------------------------------
    // Local utilities
    // -------------------------------------------------------------------------

    fn must_get_port_index(&self, handle: LocalPortHandle) -> usize {
        for (index, port) in self.ports.iter().enumerate() {
            if port.self_id == handle.0 {
                return index;
            }
        }

        unreachable!()
    }

    fn must_get_peer_index(&self, handle: LocalPeerHandle) -> usize {
        for (index, peer) in self.peers.iter().enumerate() {
            if peer.id == handle.0 {
                return index;
            }
        }

        unreachable!()
    }

    fn get_peer_index_by_id(&self, comp_id: CompId) -> Option<usize> {
        for (index, peer) in self.peers.iter().enumerate() {
            if peer.id == comp_id {
                return Some(index);
            }
        }

        return None;
    }

    fn take_port_id(&mut self) -> u32 {
        let port_id = self.port_id_counter;
        self.port_id_counter = self.port_id_counter.wrapping_add(1);
        return port_id;
    }
}