use crate::runtime2::scheduler::*; use crate::runtime2::runtime::*; use crate::runtime2::communication::*; #[derive(Debug)] pub struct Port { pub self_id: PortId, pub peer_comp_id: CompId, // eventually consistent pub peer_port_id: PortId, // eventually consistent pub kind: PortKind, pub state: PortState, #[cfg(debug_assertions)] pub(crate) associated_with_peer: bool, } pub struct Peer { pub id: CompId, pub num_associated_ports: u32, pub(crate) handle: CompHandle, } /// Port and peer management structure. Will keep a local reference counter to /// the ports associate with peers, additionally manages the atomic reference /// counter associated with the peers' component handles. pub struct CompCtx { pub id: CompId, ports: Vec, peers: Vec, port_id_counter: u32, } #[derive(Copy, Clone, PartialEq, Eq)] pub struct LocalPortHandle(PortId); #[derive(Copy, Clone)] pub struct LocalPeerHandle(CompId); impl CompCtx { /// Creates a new component context based on a reserved entry in the /// component store. This reservation is used such that we already know our /// assigned ID. pub(crate) fn new(reservation: &CompReserved) -> Self { return Self{ id: reservation.id(), ports: Vec::new(), peers: Vec::new(), port_id_counter: 0, } } /// Creates a new channel that is fully owned by the component associated /// with this context. pub(crate) fn create_channel(&mut self) -> Channel { let putter_id = PortId(self.take_port_id()); let getter_id = PortId(self.take_port_id()); self.ports.push(Port{ self_id: putter_id, peer_port_id: getter_id, kind: PortKind::Putter, state: PortState::Open, peer_comp_id: self.id, #[cfg(debug_assertions)] associated_with_peer: false, }); self.ports.push(Port{ self_id: getter_id, peer_port_id: putter_id, kind: PortKind::Getter, state: PortState::Open, peer_comp_id: self.id, #[cfg(debug_assertions)] associated_with_peer: false, }); return Channel{ putter_id, getter_id }; } /// Adds a new port. Make sure to call `add_peer` afterwards. pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle { let self_id = PortId(self.take_port_id()); self.ports.push(Port{ self_id, peer_comp_id, peer_port_id, kind, state, #[cfg(debug_assertions)] associated_with_peer: false, }); return LocalPortHandle(self_id); } /// Removes a port. Make sure you called `remove_peer` first. pub(crate) fn remove_port(&mut self, port_handle: LocalPortHandle) -> Port { let port_index = self.must_get_port_index(port_handle); let port = self.ports.remove(port_index); dbg_code!(assert!(!port.associated_with_peer)); return port; } /// Adds a new peer. This must be called for every port, no matter the /// component the channel is connected to. If a `CompHandle` is supplied, /// then it will be used to add the peer. Otherwise it will be retrieved /// from the runtime using its ID. pub(crate) fn add_peer(&mut self, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, peer_comp_id: CompId, handle: Option<&CompHandle>) { let self_id = self.id; let port = self.get_port_mut(port_handle); debug_assert_eq!(port.peer_comp_id, peer_comp_id); dbg_code!(assert!(!port.associated_with_peer)); if !Self::requires_peer_reference(port, self_id, false) { return; } dbg_code!(port.associated_with_peer = true); match self.get_peer_index_by_id(peer_comp_id) { Some(peer_index) => { let peer = &mut self.peers[peer_index]; peer.num_associated_ports += 1; }, None => { let handle = match handle { Some(handle) => handle.clone(), None => sched_ctx.runtime.get_component_public(peer_comp_id) }; self.peers.push(Peer{ id: peer_comp_id, num_associated_ports: 1, handle, }); } } } /// Removes a peer associated with a port. pub(crate) fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, peer_id: CompId, also_remove_if_closed: bool) { let self_id = self.id; let port = self.get_port_mut(port_handle); debug_assert_eq!(port.peer_comp_id, peer_id); if !Self::requires_peer_reference(port, self_id, also_remove_if_closed) { return; } dbg_code!(assert!(port.associated_with_peer)); dbg_code!(port.associated_with_peer = false); let peer_index = self.get_peer_index_by_id(peer_id).unwrap(); let peer = &mut self.peers[peer_index]; peer.num_associated_ports -= 1; if peer.num_associated_ports == 0 { let mut peer = self.peers.remove(peer_index); if let Some(key) = peer.handle.decrement_users() { debug_assert_ne!(key.downgrade(), self.id); // should be upheld by the code that shuts down a component sched_ctx.runtime.destroy_component(key); } } } pub(crate) fn set_port_state(&mut self, port_handle: LocalPortHandle, new_state: PortState) { let port_info = self.get_port_mut(port_handle); debug_assert_ne!(port_info.state, PortState::Closed); // because then we do not expect to change the state port_info.state = new_state; } pub(crate) fn get_port_handle(&self, port_id: PortId) -> LocalPortHandle { return LocalPortHandle(port_id); } // should perhaps be revised, used in main inbox pub(crate) fn get_port_index(&self, port_handle: LocalPortHandle) -> usize { return self.must_get_port_index(port_handle); } pub(crate) fn get_peer_handle(&self, peer_id: CompId) -> LocalPeerHandle { return LocalPeerHandle(peer_id); } pub(crate) fn get_port(&self, port_handle: LocalPortHandle) -> &Port { let index = self.must_get_port_index(port_handle); return &self.ports[index]; } pub(crate) fn get_port_mut(&mut self, port_handle: LocalPortHandle) -> &mut Port { let index = self.must_get_port_index(port_handle); return &mut self.ports[index]; } pub(crate) fn get_port_by_index_mut(&mut self, index: usize) -> &mut Port { return &mut self.ports[index]; } pub(crate) fn get_peer(&self, peer_handle: LocalPeerHandle) -> &Peer { let index = self.must_get_peer_index(peer_handle); return &self.peers[index]; } pub(crate) fn get_peer_mut(&mut self, peer_handle: LocalPeerHandle) -> &mut Peer { let index = self.must_get_peer_index(peer_handle); return &mut self.peers[index]; } #[inline] pub(crate) fn iter_ports(&self) -> impl Iterator { return self.ports.iter(); } #[inline] pub(crate) fn iter_ports_mut(&mut self) -> impl Iterator { return self.ports.iter_mut(); } #[inline] pub(crate) fn iter_peers(&self) -> impl Iterator { return self.peers.iter(); } #[inline] pub(crate) fn num_ports(&self) -> usize { return self.ports.len(); } // ------------------------------------------------------------------------- // Local utilities // ------------------------------------------------------------------------- #[inline] fn requires_peer_reference(port: &Port, self_id: CompId, required_if_closed: bool) -> bool { return (port.state != PortState::Closed || required_if_closed) && port.peer_comp_id != self_id; } fn must_get_port_index(&self, handle: LocalPortHandle) -> usize { for (index, port) in self.ports.iter().enumerate() { if port.self_id == handle.0 { return index; } } unreachable!() } fn must_get_peer_index(&self, handle: LocalPeerHandle) -> usize { for (index, peer) in self.peers.iter().enumerate() { if peer.id == handle.0 { return index; } } unreachable!() } fn get_peer_index_by_id(&self, comp_id: CompId) -> Option { for (index, peer) in self.peers.iter().enumerate() { if peer.id == comp_id { return Some(index); } } return None; } fn take_port_id(&mut self) -> u32 { let port_id = self.port_id_counter; self.port_id_counter = self.port_id_counter.wrapping_add(1); return port_id; } }