Files @ 1bc57ab68e0e
Branch filter:

Location: CSY/reowolf/src/runtime2/component/component_context.rs

1bc57ab68e0e 8.8 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
Max Henger
Merge branch 'feat-builtin-ip' into 'master'

feat: Builtin internet component

See merge request nl-cwi-csy/reowolf!6
use crate::runtime2::scheduler::*;
use crate::runtime2::runtime::*;
use crate::runtime2::communication::*;

#[derive(Debug)]
pub struct Port {
    pub self_id: PortId,
    pub peer_comp_id: CompId, // eventually consistent
    pub peer_port_id: PortId, // eventually consistent
    pub kind: PortKind,
    pub state: PortState,
    #[cfg(debug_assertions)] pub(crate) associated_with_peer: bool,
}

pub struct Peer {
    pub id: CompId,
    pub num_associated_ports: u32,
    pub(crate) handle: CompHandle,
}

/// Port and peer management structure. Will keep a local reference counter to
/// the ports associate with peers, additionally manages the atomic reference
/// counter associated with the peers' component handles.
pub struct CompCtx {
    pub id: CompId,
    ports: Vec<Port>,
    peers: Vec<Peer>,
    port_id_counter: u32,
}

#[derive(Copy, Clone, PartialEq, Eq)]
pub struct LocalPortHandle(PortId);

#[derive(Copy, Clone)]
pub struct LocalPeerHandle(CompId);

impl CompCtx {
    /// Creates a new component context based on a reserved entry in the
    /// component store. This reservation is used such that we already know our
    /// assigned ID.
    pub(crate) fn new(reservation: &CompReserved) -> Self {
        return Self{
            id: reservation.id(),
            ports: Vec::new(),
            peers: Vec::new(),
            port_id_counter: 0,
        }
    }

    /// Creates a new channel that is fully owned by the component associated
    /// with this context.
    pub(crate) fn create_channel(&mut self) -> Channel {
        let putter_id = PortId(self.take_port_id());
        let getter_id = PortId(self.take_port_id());
        self.ports.push(Port{
            self_id: putter_id,
            peer_port_id: getter_id,
            kind: PortKind::Putter,
            state: PortState::Open,
            peer_comp_id: self.id,
            #[cfg(debug_assertions)] associated_with_peer: false,
        });
        self.ports.push(Port{
            self_id: getter_id,
            peer_port_id: putter_id,
            kind: PortKind::Getter,
            state: PortState::Open,
            peer_comp_id: self.id,
            #[cfg(debug_assertions)] associated_with_peer: false,
        });

        return Channel{ putter_id, getter_id };
    }

    /// Adds a new port. Make sure to call `add_peer` afterwards.
    pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle {
        let self_id = PortId(self.take_port_id());
        self.ports.push(Port{
            self_id, peer_comp_id, peer_port_id, kind, state,
            #[cfg(debug_assertions)] associated_with_peer: false,
        });
        return LocalPortHandle(self_id);
    }

    /// Removes a port. Make sure you called `remove_peer` first.
    pub(crate) fn remove_port(&mut self, port_handle: LocalPortHandle) -> Port {
        let port_index = self.must_get_port_index(port_handle);
        let port = self.ports.remove(port_index);
        dbg_code!(assert!(!port.associated_with_peer));
        return port;
    }

    /// Adds a new peer. This must be called for every port, no matter the
    /// component the channel is connected to. If a `CompHandle` is supplied,
    /// then it will be used to add the peer. Otherwise it will be retrieved
    /// from the runtime using its ID.
    pub(crate) fn add_peer(&mut self, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, peer_comp_id: CompId, handle: Option<&CompHandle>) {
        let self_id = self.id;
        let port = self.get_port_mut(port_handle);
        debug_assert_eq!(port.peer_comp_id, peer_comp_id);
        dbg_code!(assert!(!port.associated_with_peer));
        if !Self::requires_peer_reference(port, self_id, false) {
            return;
        }

        dbg_code!(port.associated_with_peer = true);
        match self.get_peer_index_by_id(peer_comp_id) {
            Some(peer_index) => {
                let peer = &mut self.peers[peer_index];
                peer.num_associated_ports += 1;
            },
            None => {
                let handle = match handle {
                    Some(handle) => handle.clone(),
                    None => sched_ctx.runtime.get_component_public(peer_comp_id)
                };
                self.peers.push(Peer{
                    id: peer_comp_id,
                    num_associated_ports: 1,
                    handle,
                });
            }
        }
    }

    /// Removes a peer associated with a port.
    pub(crate) fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, peer_id: CompId, also_remove_if_closed: bool) {
        let self_id = self.id;
        let port = self.get_port_mut(port_handle);
        debug_assert_eq!(port.peer_comp_id, peer_id);
        if !Self::requires_peer_reference(port, self_id, also_remove_if_closed) {
            return;
        }

        dbg_code!(assert!(port.associated_with_peer));
        dbg_code!(port.associated_with_peer = false);
        let peer_index = self.get_peer_index_by_id(peer_id).unwrap();
        let peer = &mut self.peers[peer_index];
        peer.num_associated_ports -= 1;
        if peer.num_associated_ports == 0 {
            let mut peer = self.peers.remove(peer_index);
            if let Some(key) = peer.handle.decrement_users() {
                debug_assert_ne!(key.downgrade(), self.id); // should be upheld by the code that shuts down a component
                sched_ctx.runtime.destroy_component(key);
            }
        }
    }

    pub(crate) fn set_port_state(&mut self, port_handle: LocalPortHandle, new_state: PortState) {
        let port_info = self.get_port_mut(port_handle);
        debug_assert_ne!(port_info.state, PortState::Closed); // because then we do not expect to change the state
        port_info.state = new_state;
    }

    pub(crate) fn get_port_handle(&self, port_id: PortId) -> LocalPortHandle {
        return LocalPortHandle(port_id);
    }

    // should perhaps be revised, used in main inbox
    pub(crate) fn get_port_index(&self, port_handle: LocalPortHandle) -> usize {
        return self.must_get_port_index(port_handle);
    }

    pub(crate) fn get_peer_handle(&self, peer_id: CompId) -> LocalPeerHandle {
        return LocalPeerHandle(peer_id);
    }

    pub(crate) fn get_port(&self, port_handle: LocalPortHandle) -> &Port {
        let index = self.must_get_port_index(port_handle);
        return &self.ports[index];
    }

    pub(crate) fn get_port_mut(&mut self, port_handle: LocalPortHandle) -> &mut Port {
        let index = self.must_get_port_index(port_handle);
        return &mut self.ports[index];
    }

    pub(crate) fn get_port_by_index_mut(&mut self, index: usize) -> &mut Port {
        return &mut self.ports[index];
    }

    pub(crate) fn get_peer(&self, peer_handle: LocalPeerHandle) -> &Peer {
        let index = self.must_get_peer_index(peer_handle);
        return &self.peers[index];
    }

    pub(crate) fn get_peer_mut(&mut self, peer_handle: LocalPeerHandle) -> &mut Peer {
        let index = self.must_get_peer_index(peer_handle);
        return &mut self.peers[index];
    }

    #[inline]
    pub(crate) fn iter_ports(&self) -> impl Iterator<Item=&Port> {
        return self.ports.iter();
    }

    #[inline]
    pub(crate) fn iter_ports_mut(&mut self) -> impl Iterator<Item=&mut Port> {
        return self.ports.iter_mut();
    }

    #[inline]
    pub(crate) fn iter_peers(&self) -> impl Iterator<Item=&Peer> {
        return self.peers.iter();
    }

    #[inline]
    pub(crate) fn num_ports(&self) -> usize {
        return self.ports.len();
    }

    // -------------------------------------------------------------------------
    // Local utilities
    // -------------------------------------------------------------------------

    #[inline]
    fn requires_peer_reference(port: &Port, self_id: CompId, required_if_closed: bool) -> bool {
        return (port.state != PortState::Closed || required_if_closed) && port.peer_comp_id != self_id;
    }

    fn must_get_port_index(&self, handle: LocalPortHandle) -> usize {
        for (index, port) in self.ports.iter().enumerate() {
            if port.self_id == handle.0 {
                return index;
            }
        }

        unreachable!()
    }

    fn must_get_peer_index(&self, handle: LocalPeerHandle) -> usize {
        for (index, peer) in self.peers.iter().enumerate() {
            if peer.id == handle.0 {
                return index;
            }
        }

        unreachable!()
    }

    fn get_peer_index_by_id(&self, comp_id: CompId) -> Option<usize> {
        for (index, peer) in self.peers.iter().enumerate() {
            if peer.id == comp_id {
                return Some(index);
            }
        }

        return None;
    }

    fn take_port_id(&mut self) -> u32 {
        let port_id = self.port_id_counter;
        self.port_id_counter = self.port_id_counter.wrapping_add(1);
        return port_id;
    }
}