Files @ ff87427e49f0
Branch filter:

Location: CSY/reowolf/src/runtime2/component/component_context.rs - annotation

ff87427e49f0 12.2 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
Remove unused field from port metadata
1f78496722d1
1f78496722d1
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
38c129959044
38c129959044
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
38c129959044
38c129959044
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
e7df1d2ae35f
e7df1d2ae35f
1f78496722d1
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
1f78496722d1
e7df1d2ae35f
e7df1d2ae35f
1f78496722d1
d81c8519ee2c
1f78496722d1
1f78496722d1
1f78496722d1
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
637115283740
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
1f78496722d1
e7df1d2ae35f
d81c8519ee2c
1f78496722d1
1f78496722d1
1f78496722d1
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
1f78496722d1
e7df1d2ae35f
d81c8519ee2c
1f78496722d1
1f78496722d1
1f78496722d1
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
38c129959044
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
d81c8519ee2c
1f78496722d1
1f78496722d1
1f78496722d1
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
38c129959044
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
11fd959b348a
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
5415acc02756
5415acc02756
5415acc02756
5415acc02756
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
5415acc02756
5415acc02756
5415acc02756
5415acc02756
5415acc02756
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
use std::fmt::{Debug, Formatter, Result as FmtResult};

use crate::runtime2::scheduler::*;
use crate::runtime2::runtime::*;
use crate::runtime2::communication::*;

use crate::protocol::ExpressionId;

/// Helper struct to remember when the last operation on the port took place.
#[derive(Debug, PartialEq, Copy, Clone)]
pub enum PortInstruction {
    None,
    NoSource,
    SourceLocation(ExpressionId),
}

impl PortInstruction {
    pub fn is_none(&self) -> bool {
        match self {
            PortInstruction::None => return true,
            _ => return false,
        }
    }
}

/// Directionality of a port
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum PortKind {
    Putter,
    Getter,
}

/// Bitflags for port
// TODO: Incorporate remaining flags from `Port` struct
#[repr(u32)]
#[derive(Debug, Copy, Clone)]
pub enum PortStateFlag {
    Closed = 0x01, // If not closed, then the port is open
    BlockedDueToPeerChange = 0x02, // busy changing peers, hence use of port is temporarily blocked
    BlockedDueToFullBuffers = 0x04,
    Transmitted = 0x08, // Transmitted, so cannot be used anymore
    Received = 0x10, // Received, so cannot be used yet, only after the sync round
}

#[derive(Copy, Clone)]
pub struct PortState {
    flags: u32
}

impl PortState {
    pub(crate) fn new() -> PortState {
        return PortState{ flags: 0 }
    }

    // high-level

    #[inline]
    pub fn is_open(&self) -> bool {
        return !self.is_closed();
    }

    #[inline]
    pub fn can_send(&self) -> bool {
        return
            !self.is_set(PortStateFlag::Closed) &&
            !self.is_set(PortStateFlag::Transmitted) &&
            !self.is_set(PortStateFlag::Received);
    }

    #[inline]
    pub fn is_closed(&self) -> bool {
        return self.is_set(PortStateFlag::Closed);
    }

    #[inline]
    pub fn is_blocked(&self) -> bool {
        return
            self.is_set(PortStateFlag::BlockedDueToPeerChange) ||
            self.is_set(PortStateFlag::BlockedDueToFullBuffers);
    }

    #[inline]
    pub fn is_blocked_due_to_port_change(&self) -> bool {
        return self.is_set(PortStateFlag::BlockedDueToPeerChange);
    }

    // lower-level utils
    #[inline]
    pub fn set(&mut self, flag: PortStateFlag) {
        self.flags |= flag as u32;
    }

    #[inline]
    pub fn clear(&mut self, flag: PortStateFlag) {
        self.flags &= !(flag as u32);
    }

    #[inline]
    pub const fn is_set(&self, flag: PortStateFlag) -> bool {
        return (self.flags & (flag as u32)) != 0;
    }
}

impl Debug for PortState {
    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
        use PortStateFlag::*;

        let mut s = f.debug_struct("PortState");
        for (flag_name, flag_value) in &[
            ("closed", Closed),
            ("blocked_peer_change", BlockedDueToPeerChange),
            ("blocked_full_buffers", BlockedDueToFullBuffers),
            ("transmitted", Transmitted),
        ] {
            s.field(flag_name, &self.is_set(*flag_value));
        }

        return s.finish();
    }
}

#[derive(Debug)]
pub struct Port {
    // Identifiers
    pub self_id: PortId,
    pub peer_comp_id: CompId, // eventually consistent
    pub peer_port_id: PortId, // eventually consistent
    // Generic operating state
    pub kind: PortKind,
    pub state: PortState,
    // State tracking for error detection and error handling
    pub last_registered_round: Option<u32>,
    pub last_instruction: PortInstruction, // used during sync round to detect port-closed-during-sync errors
    pub close_at_sync_end: bool, // set during sync round when receiving a port-closed-after-sync message
    pub(crate) associated_with_peer: bool,
}

pub struct Peer {
    pub id: CompId,
    pub num_associated_ports: u32,
    pub(crate) handle: CompHandle,
}

/// Port and peer management structure. Will keep a local reference counter to
/// the ports associate with peers, additionally manages the atomic reference
/// counter associated with the peers' component handles.
pub struct CompCtx {
    pub id: CompId,
    ports: Vec<Port>,
    peers: Vec<Peer>,
    port_id_counter: u32,
}

#[derive(Copy, Clone, PartialEq, Eq)]
pub struct LocalPortHandle(PortId);

#[derive(Copy, Clone)]
pub struct LocalPeerHandle(CompId);

impl CompCtx {
    /// Creates a new component context based on a reserved entry in the
    /// component store. This reservation is used such that we already know our
    /// assigned ID.
    pub(crate) fn new(reservation: &CompReserved) -> Self {
        return Self{
            id: reservation.id(),
            ports: Vec::new(),
            peers: Vec::new(),
            port_id_counter: 0,
        }
    }

    /// Creates a new channel that is fully owned by the component associated
    /// with this context.
    pub(crate) fn create_channel(&mut self) -> Channel {
        let putter_id = PortId(self.take_port_id());
        let getter_id = PortId(self.take_port_id());
        self.ports.push(Port{
            self_id: putter_id,
            peer_port_id: getter_id,
            kind: PortKind::Putter,
            state: PortState::new(),
            peer_comp_id: self.id,
            last_registered_round: None,
            last_instruction: PortInstruction::None,
            close_at_sync_end: false,
            associated_with_peer: false,
        });
        self.ports.push(Port{
            self_id: getter_id,
            peer_port_id: putter_id,
            kind: PortKind::Getter,
            state: PortState::new(),
            peer_comp_id: self.id,
            last_registered_round: None,
            last_instruction: PortInstruction::None,
            close_at_sync_end: false,
            associated_with_peer: false,
        });

        return Channel{ putter_id, getter_id };
    }

    /// Adds a new port. Make sure to call `change_peer` afterwards.
    pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle {
        let self_id = PortId(self.take_port_id());
        self.ports.push(Port{
            self_id, peer_comp_id, peer_port_id, kind, state,
            last_registered_round: None,
            last_instruction: PortInstruction::None,
            close_at_sync_end: false,
            associated_with_peer: false,
        });
        return LocalPortHandle(self_id);
    }

    /// Adds a self-reference. Called by the runtime/scheduler
    pub(crate) fn add_self_reference(&mut self, self_handle: CompHandle) {
        debug_assert_eq!(self.id, self_handle.id());
        debug_assert!(self.get_peer_index_by_id(self.id).is_none());
        self.peers.push(Peer{
            id: self.id,
            num_associated_ports: 0,
            handle: self_handle
        });
    }

    /// Removes a self-reference. Called by the runtime/scheduler
    pub(crate) fn remove_self_reference(&mut self) -> Option<CompKey> {
        let self_index = self.get_peer_index_by_id(self.id).unwrap();
        let peer = &mut self.peers[self_index];
        let maybe_comp_key = peer.handle.decrement_users();
        self.peers.remove(self_index);

        return maybe_comp_key;
    }

    /// Removes a port. Make sure you called `change_peer` first.
    pub(crate) fn remove_port(&mut self, port_handle: LocalPortHandle) -> Port {
        let port_index = self.must_get_port_index(port_handle);
        let port = self.ports.remove(port_index);
        dbg_code!(assert!(!port.associated_with_peer));
        return port;
    }

    /// Changes a peer
    pub(crate) fn change_port_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, new_peer_comp_id: Option<CompId>) {
        // If port is currently associated with a peer, then remove that peer
        let port_index = self.get_port_index(port_handle);
        let port = &mut self.ports[port_index];
        let port_is_closed = port.state.is_closed();
        if port.associated_with_peer {
            // Remove old peer association
            port.associated_with_peer = false;
            let peer_comp_id = port.peer_comp_id;
            let peer_index = self.get_peer_index_by_id(peer_comp_id).unwrap();
            let peer = &mut self.peers[peer_index];

            peer.num_associated_ports -= 1;
            if peer.num_associated_ports == 0 {
                let mut peer = self.peers.remove(peer_index);
                if let Some(key) = peer.handle.decrement_users() {
                    sched_ctx.runtime.destroy_component(key);
                }
            }
        }

        // If there is a new peer, then set it as the peer associated with the
        // port
        if let Some(peer_id) = new_peer_comp_id {
            let port = &mut self.ports[port_index];
            port.peer_comp_id = peer_id;

            if peer_id != self.id && !port_is_closed {
                port.associated_with_peer = true;

                match self.get_peer_index_by_id(peer_id) {
                    Some(index) => {
                        let peer = &mut self.peers[index];
                        peer.num_associated_ports += 1;
                    },
                    None => {
                        let handle = sched_ctx.runtime.get_component_public(peer_id);
                        self.peers.push(Peer {
                            id: peer_id,
                            num_associated_ports: 1,
                            handle
                        })
                    }
                }
            }
        }
    }

    pub(crate) fn get_port_handle(&self, port_id: PortId) -> LocalPortHandle {
        return LocalPortHandle(port_id);
    }

    // should perhaps be revised, used in main inbox
    pub(crate) fn get_port_index(&self, port_handle: LocalPortHandle) -> usize {
        return self.must_get_port_index(port_handle);
    }

    pub(crate) fn get_peer_handle(&self, peer_id: CompId) -> LocalPeerHandle {
        return LocalPeerHandle(peer_id);
    }

    pub(crate) fn get_port(&self, port_handle: LocalPortHandle) -> &Port {
        let index = self.must_get_port_index(port_handle);
        return &self.ports[index];
    }

    pub(crate) fn get_port_mut(&mut self, port_handle: LocalPortHandle) -> &mut Port {
        let index = self.must_get_port_index(port_handle);
        return &mut self.ports[index];
    }

    pub(crate) fn get_port_by_index_mut(&mut self, index: usize) -> &mut Port {
        return &mut self.ports[index];
    }

    pub(crate) fn get_peer(&self, peer_handle: LocalPeerHandle) -> &Peer {
        let index = self.must_get_peer_index(peer_handle);
        return &self.peers[index];
    }

    pub(crate) fn get_peer_mut(&mut self, peer_handle: LocalPeerHandle) -> &mut Peer {
        let index = self.must_get_peer_index(peer_handle);
        return &mut self.peers[index];
    }

    #[inline]
    pub(crate) fn iter_ports(&self) -> impl Iterator<Item=&Port> {
        return self.ports.iter();
    }

    #[inline]
    pub(crate) fn iter_ports_mut(&mut self) -> impl Iterator<Item=&mut Port> {
        return self.ports.iter_mut();
    }

    #[inline]
    pub(crate) fn iter_peers(&self) -> impl Iterator<Item=&Peer> {
        return self.peers.iter();
    }

    #[inline]
    pub(crate) fn num_ports(&self) -> usize {
        return self.ports.len();
    }

    // -------------------------------------------------------------------------
    // Local utilities
    // -------------------------------------------------------------------------

    fn must_get_port_index(&self, handle: LocalPortHandle) -> usize {
        for (index, port) in self.ports.iter().enumerate() {
            if port.self_id == handle.0 {
                return index;
            }
        }

        unreachable!()
    }

    fn must_get_peer_index(&self, handle: LocalPeerHandle) -> usize {
        for (index, peer) in self.peers.iter().enumerate() {
            if peer.id == handle.0 {
                return index;
            }
        }

        unreachable!()
    }

    fn get_peer_index_by_id(&self, comp_id: CompId) -> Option<usize> {
        for (index, peer) in self.peers.iter().enumerate() {
            if peer.id == comp_id {
                return Some(index);
            }
        }

        return None;
    }

    fn take_port_id(&mut self) -> u32 {
        let port_id = self.port_id_counter;
        self.port_id_counter = self.port_id_counter.wrapping_add(1);
        return port_id;
    }
}