diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs new file mode 100644 index 0000000000000000000000000000000000000000..84c5594683bb234fe23c173307648b1b895590cc --- /dev/null +++ b/src/runtime2/component/component_context.rs @@ -0,0 +1,245 @@ +use crate::runtime2::scheduler::*; +use crate::runtime2::runtime::*; +use crate::runtime2::communication::*; + +#[derive(Debug)] +pub struct Port { + pub self_id: PortId, + pub peer_comp_id: CompId, // eventually consistent + pub peer_port_id: PortId, // eventually consistent + pub kind: PortKind, + pub state: PortState, + #[cfg(debug_assertions)] pub(crate) associated_with_peer: bool, +} + +pub struct Peer { + pub id: CompId, + pub num_associated_ports: u32, + pub(crate) handle: CompHandle, +} + +/// Port and peer management structure. Will keep a local reference counter to +/// the ports associate with peers, additionally manages the atomic reference +/// counter associated with the peers' component handles. +pub struct CompCtx { + pub id: CompId, + ports: Vec, + peers: Vec, + port_id_counter: u32, +} + +#[derive(Copy, Clone)] +pub struct LocalPortHandle(PortId); + +#[derive(Copy, Clone)] +pub struct LocalPeerHandle(CompId); + +impl CompCtx { + /// Creates a new component context based on a reserved entry in the + /// component store. This reservation is used such that we already know our + /// assigned ID. + pub(crate) fn new(reservation: &CompReserved) -> Self { + return Self{ + id: reservation.id(), + ports: Vec::new(), + peers: Vec::new(), + port_id_counter: 0, + } + } + + /// Creates a new channel that is fully owned by the component associated + /// with this context. + pub(crate) fn create_channel(&mut self) -> Channel { + let putter_id = PortId(self.take_port_id()); + let getter_id = PortId(self.take_port_id()); + self.ports.push(Port{ + self_id: putter_id, + peer_port_id: getter_id, + kind: PortKind::Putter, + state: PortState::Open, + peer_comp_id: self.id, + associated_with_peer: false, + }); + self.ports.push(Port{ + self_id: getter_id, + peer_port_id: putter_id, + kind: PortKind::Getter, + state: PortState::Open, + peer_comp_id: self.id, + associated_with_peer: false, + }); + + return Channel{ putter_id, getter_id }; + } + + /// Adds a new port. Make sure to call `add_peer` afterwards. + pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle { + let self_id = PortId(self.take_port_id()); + self.ports.push(Port{ + self_id, peer_comp_id, peer_port_id, kind, state, + #[cfg(debug_assertions)] associated_with_peer: false, + }); + return LocalPortHandle(self_id); + } + + /// Removes a port. Make sure you called `remove_peer` first. + pub(crate) fn remove_port(&mut self, port_handle: LocalPortHandle) -> Port { + let port_index = self.must_get_port_index(port_handle); + let port = self.ports.remove(port_index); + debug_assert!(!port.associated_with_peer); + return port; + } + + /// Adds a new peer. This must be called for every port, no matter the + /// component the channel is connected to. If a `CompHandle` is supplied, + /// then it will be used to add the peer. Otherwise it will be retrieved + /// from the runtime using its ID. + pub(crate) fn add_peer(&mut self, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, peer_comp_id: CompId, handle: Option<&CompHandle>) { + let port = self.get_port_mut(port_handle); + debug_assert_eq!(port.peer_comp_id, peer_comp_id); + debug_assert!(!port.associated_with_peer); + if !self.requires_peer_reference(port) { + return; + } + + dbg_code!(port.associated_with_peer = true); + match self.get_peer_index_by_id(peer_comp_id) { + Some(peer_index) => { + let peer = &mut self.peers[peer_index]; + peer.num_associated_ports += 1; + }, + None => { + let handle = match handle { + Some(handle) => handle.clone(), + None => sched_ctx.runtime.get_component_public(peer_comp_id) + }; + self.peers.push(Peer{ + id: peer_comp_id, + num_associated_ports: 1, + handle, + }); + } + } + } + + /// Removes a peer associated with a port. + pub(crate) fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, peer_id: CompId) { + let port = self.get_port_mut(port_handle); + debug_assert_eq!(port.peer_comp_id, peer_id); + if !self.requires_peer_reference(port) { + return; + } + + debug_assert!(port.associated_with_peer); + dbg_code!(port.associated_with_peer = false); + let peer_index = self.get_peer_index_by_id(peer_id).unwrap(); + let peer = &mut self.peers[peer_index]; + peer.num_associated_ports -= 1; + if peer.num_associated_ports == 0 { + let mut peer = self.peers.remove(peer_index); + if let Some(key) = peer.handle.decrement_users() { + debug_assert_ne!(key.downgrade(), self.id); // should be upheld by the code that shuts down a component + sched_ctx.runtime.destroy_component(key); + } + } + } + + pub(crate) fn set_port_state(&mut self, port_handle: LocalPortHandle, new_state: PortState) { + let port_info = self.get_port_mut(port_handle); + debug_assert_ne!(port_info.state, PortState::Closed); // because then we do not expect to change the state + port_info.state = new_state; + } + + pub(crate) fn get_port_handle(&self, port_id: PortId) -> LocalPortHandle { + return LocalPortHandle(port_id); + } + + // should perhaps be revised, used in main inbox + pub(crate) fn get_port_index(&self, port_handle: LocalPortHandle) -> usize { + return self.must_get_port_index(port_handle); + } + + pub(crate) fn get_peer_handle(&self, peer_id: CompId) -> LocalPeerHandle { + return LocalPeerHandle(peer_id); + } + + pub(crate) fn get_port(&self, port_handle: LocalPortHandle) -> &Port { + let index = self.must_get_port_index(port_handle); + return &self.ports[index]; + } + + pub(crate) fn get_port_mut(&mut self, port_handle: LocalPortHandle) -> &mut Port { + let index = self.must_get_port_index(port_handle); + return &mut self.ports[index]; + } + + pub(crate) fn get_peer(&self, peer_handle: LocalPeerHandle) -> &Peer { + let index = self.must_get_peer_index(peer_handle); + return &self.peers[index]; + } + + pub(crate) fn get_peer_mut(&mut self, peer_handle: LocalPeerHandle) -> &mut Peer { + let index = self.must_get_peer_index(peer_handle); + return &mut self.peers[index]; + } + + #[inline] + pub(crate) fn iter_ports(&self) -> impl Iterator { + return self.ports.iter(); + } + + #[inline] + pub(crate) fn iter_peers(&self) -> impl Iterator { + return self.peers.iter(); + } + + #[inline] + pub(crate) fn num_ports(&self) -> usize { + return self.ports.len(); + } + + // ------------------------------------------------------------------------- + // Local utilities + // ------------------------------------------------------------------------- + + #[inline] + fn requires_peer_reference(&self, port: &Port) -> bool { + return port.state == PortState::Closed; + } + + fn must_get_port_index(&self, handle: LocalPortHandle) -> usize { + for (index, port) in self.ports.iter().enumerate() { + if port.self_id == handle.0 { + return index; + } + } + + unreachable!() + } + + fn must_get_peer_index(&self, handle: LocalPeerHandle) -> usize { + for (index, peer) in self.peers.iter().enumerate() { + if peer.id == handle.0 { + return index; + } + } + + unreachable!() + } + + fn get_peer_index_by_id(&self, comp_id: CompId) -> Option { + for (index, peer) in self.peers.iter().enumerate() { + if peer.id == comp_id { + return Some(index); + } + } + + return None; + } + + fn take_port_id(&mut self) -> u32 { + let port_id = self.port_id_counter; + self.port_id_counter = self.port_id_counter.wrapping_add(1); + return port_id; + } +} \ No newline at end of file