Files @ 4da95131e7f4
Branch filter:

Location: CSY/reowolf/src/runtime2/component/component_context.rs - annotation

4da95131e7f4 8.8 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
WIP on docs on inside-sync errors
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
637115283740
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
11fd959b348a
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
11fd959b348a
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
11fd959b348a
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
5415acc02756
e7df1d2ae35f
e7df1d2ae35f
11fd959b348a
42785e82880a
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
42785e82880a
5415acc02756
e7df1d2ae35f
e7df1d2ae35f
42785e82880a
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
11fd959b348a
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
5415acc02756
5415acc02756
5415acc02756
5415acc02756
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
5415acc02756
5415acc02756
5415acc02756
5415acc02756
5415acc02756
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
42785e82880a
42785e82880a
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
use crate::runtime2::scheduler::*;
use crate::runtime2::runtime::*;
use crate::runtime2::communication::*;

#[derive(Debug)]
pub struct Port {
    pub self_id: PortId,
    pub peer_comp_id: CompId, // eventually consistent
    pub peer_port_id: PortId, // eventually consistent
    pub kind: PortKind,
    pub state: PortState,
    #[cfg(debug_assertions)] pub(crate) associated_with_peer: bool,
}

pub struct Peer {
    pub id: CompId,
    pub num_associated_ports: u32,
    pub(crate) handle: CompHandle,
}

/// Port and peer management structure. Will keep a local reference counter to
/// the ports associate with peers, additionally manages the atomic reference
/// counter associated with the peers' component handles.
pub struct CompCtx {
    pub id: CompId,
    ports: Vec<Port>,
    peers: Vec<Peer>,
    port_id_counter: u32,
}

#[derive(Copy, Clone, PartialEq, Eq)]
pub struct LocalPortHandle(PortId);

#[derive(Copy, Clone)]
pub struct LocalPeerHandle(CompId);

impl CompCtx {
    /// Creates a new component context based on a reserved entry in the
    /// component store. This reservation is used such that we already know our
    /// assigned ID.
    pub(crate) fn new(reservation: &CompReserved) -> Self {
        return Self{
            id: reservation.id(),
            ports: Vec::new(),
            peers: Vec::new(),
            port_id_counter: 0,
        }
    }

    /// Creates a new channel that is fully owned by the component associated
    /// with this context.
    pub(crate) fn create_channel(&mut self) -> Channel {
        let putter_id = PortId(self.take_port_id());
        let getter_id = PortId(self.take_port_id());
        self.ports.push(Port{
            self_id: putter_id,
            peer_port_id: getter_id,
            kind: PortKind::Putter,
            state: PortState::Open,
            peer_comp_id: self.id,
            #[cfg(debug_assertions)] associated_with_peer: false,
        });
        self.ports.push(Port{
            self_id: getter_id,
            peer_port_id: putter_id,
            kind: PortKind::Getter,
            state: PortState::Open,
            peer_comp_id: self.id,
            #[cfg(debug_assertions)] associated_with_peer: false,
        });

        return Channel{ putter_id, getter_id };
    }

    /// Adds a new port. Make sure to call `add_peer` afterwards.
    pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle {
        let self_id = PortId(self.take_port_id());
        self.ports.push(Port{
            self_id, peer_comp_id, peer_port_id, kind, state,
            #[cfg(debug_assertions)] associated_with_peer: false,
        });
        return LocalPortHandle(self_id);
    }

    /// Removes a port. Make sure you called `remove_peer` first.
    pub(crate) fn remove_port(&mut self, port_handle: LocalPortHandle) -> Port {
        let port_index = self.must_get_port_index(port_handle);
        let port = self.ports.remove(port_index);
        dbg_code!(assert!(!port.associated_with_peer));
        return port;
    }

    /// Adds a new peer. This must be called for every port, no matter the
    /// component the channel is connected to. If a `CompHandle` is supplied,
    /// then it will be used to add the peer. Otherwise it will be retrieved
    /// from the runtime using its ID.
    pub(crate) fn add_peer(&mut self, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, peer_comp_id: CompId, handle: Option<&CompHandle>) {
        let self_id = self.id;
        let port = self.get_port_mut(port_handle);
        debug_assert_eq!(port.peer_comp_id, peer_comp_id);
        dbg_code!(assert!(!port.associated_with_peer));
        if !Self::requires_peer_reference(port, self_id, false) {
            return;
        }

        dbg_code!(port.associated_with_peer = true);
        match self.get_peer_index_by_id(peer_comp_id) {
            Some(peer_index) => {
                let peer = &mut self.peers[peer_index];
                peer.num_associated_ports += 1;
            },
            None => {
                let handle = match handle {
                    Some(handle) => handle.clone(),
                    None => sched_ctx.runtime.get_component_public(peer_comp_id)
                };
                self.peers.push(Peer{
                    id: peer_comp_id,
                    num_associated_ports: 1,
                    handle,
                });
            }
        }
    }

    /// Removes a peer associated with a port.
    pub(crate) fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, peer_id: CompId, also_remove_if_closed: bool) {
        let self_id = self.id;
        let port = self.get_port_mut(port_handle);
        debug_assert_eq!(port.peer_comp_id, peer_id);
        if !Self::requires_peer_reference(port, self_id, also_remove_if_closed) {
            return;
        }

        dbg_code!(assert!(port.associated_with_peer));
        dbg_code!(port.associated_with_peer = false);
        let peer_index = self.get_peer_index_by_id(peer_id).unwrap();
        let peer = &mut self.peers[peer_index];
        peer.num_associated_ports -= 1;
        if peer.num_associated_ports == 0 {
            let mut peer = self.peers.remove(peer_index);
            if let Some(key) = peer.handle.decrement_users() {
                debug_assert_ne!(key.downgrade(), self.id); // should be upheld by the code that shuts down a component
                sched_ctx.runtime.destroy_component(key);
            }
        }
    }

    pub(crate) fn set_port_state(&mut self, port_handle: LocalPortHandle, new_state: PortState) {
        let port_info = self.get_port_mut(port_handle);
        debug_assert_ne!(port_info.state, PortState::Closed); // because then we do not expect to change the state
        port_info.state = new_state;
    }

    pub(crate) fn get_port_handle(&self, port_id: PortId) -> LocalPortHandle {
        return LocalPortHandle(port_id);
    }

    // should perhaps be revised, used in main inbox
    pub(crate) fn get_port_index(&self, port_handle: LocalPortHandle) -> usize {
        return self.must_get_port_index(port_handle);
    }

    pub(crate) fn get_peer_handle(&self, peer_id: CompId) -> LocalPeerHandle {
        return LocalPeerHandle(peer_id);
    }

    pub(crate) fn get_port(&self, port_handle: LocalPortHandle) -> &Port {
        let index = self.must_get_port_index(port_handle);
        return &self.ports[index];
    }

    pub(crate) fn get_port_mut(&mut self, port_handle: LocalPortHandle) -> &mut Port {
        let index = self.must_get_port_index(port_handle);
        return &mut self.ports[index];
    }

    pub(crate) fn get_port_by_index_mut(&mut self, index: usize) -> &mut Port {
        return &mut self.ports[index];
    }

    pub(crate) fn get_peer(&self, peer_handle: LocalPeerHandle) -> &Peer {
        let index = self.must_get_peer_index(peer_handle);
        return &self.peers[index];
    }

    pub(crate) fn get_peer_mut(&mut self, peer_handle: LocalPeerHandle) -> &mut Peer {
        let index = self.must_get_peer_index(peer_handle);
        return &mut self.peers[index];
    }

    #[inline]
    pub(crate) fn iter_ports(&self) -> impl Iterator<Item=&Port> {
        return self.ports.iter();
    }

    #[inline]
    pub(crate) fn iter_ports_mut(&mut self) -> impl Iterator<Item=&mut Port> {
        return self.ports.iter_mut();
    }

    #[inline]
    pub(crate) fn iter_peers(&self) -> impl Iterator<Item=&Peer> {
        return self.peers.iter();
    }

    #[inline]
    pub(crate) fn num_ports(&self) -> usize {
        return self.ports.len();
    }

    // -------------------------------------------------------------------------
    // Local utilities
    // -------------------------------------------------------------------------

    #[inline]
    fn requires_peer_reference(port: &Port, self_id: CompId, required_if_closed: bool) -> bool {
        return (port.state != PortState::Closed || required_if_closed) && port.peer_comp_id != self_id;
    }

    fn must_get_port_index(&self, handle: LocalPortHandle) -> usize {
        for (index, port) in self.ports.iter().enumerate() {
            if port.self_id == handle.0 {
                return index;
            }
        }

        unreachable!()
    }

    fn must_get_peer_index(&self, handle: LocalPeerHandle) -> usize {
        for (index, peer) in self.peers.iter().enumerate() {
            if peer.id == handle.0 {
                return index;
            }
        }

        unreachable!()
    }

    fn get_peer_index_by_id(&self, comp_id: CompId) -> Option<usize> {
        for (index, peer) in self.peers.iter().enumerate() {
            if peer.id == comp_id {
                return Some(index);
            }
        }

        return None;
    }

    fn take_port_id(&mut self) -> u32 {
        let port_id = self.port_id_counter;
        self.port_id_counter = self.port_id_counter.wrapping_add(1);
        return port_id;
    }
}