use std::fmt::{Debug, Formatter, Result as FmtResult}; use crate::runtime2::scheduler::*; use crate::runtime2::runtime::*; use crate::runtime2::communication::*; use crate::protocol::ExpressionId; /// Helper struct to remember when the last operation on the port took place. #[derive(Debug, PartialEq, Copy, Clone)] pub enum PortInstruction { None, NoSource, SourceLocation(ExpressionId), } impl PortInstruction { pub fn is_none(&self) -> bool { match self { PortInstruction::None => return true, _ => return false, } } } /// Directionality of a port #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum PortKind { Putter, Getter, } /// Bitflags for port // TODO: Incorporate remaining flags from `Port` struct #[repr(u32)] #[derive(Debug, Copy, Clone)] pub enum PortStateFlag { Closed = 0x01, // If not closed, then the port is open BlockedDueToPeerChange = 0x02, // busy changing peers, hence use of port is temporarily blocked BlockedDueToFullBuffers = 0x04, Transmitted = 0x08, // Transmitted, so cannot be used anymore Received = 0x10, // Received, so cannot be used yet, only after the sync round } #[derive(Copy, Clone)] pub struct PortState { flags: u32 } impl PortState { pub(crate) fn new() -> PortState { return PortState{ flags: 0 } } // high-level #[inline] pub fn is_open(&self) -> bool { return !self.is_closed(); } #[inline] pub fn can_send(&self) -> bool { return !self.is_set(PortStateFlag::Closed) && !self.is_set(PortStateFlag::Transmitted) && !self.is_set(PortStateFlag::Received); } #[inline] pub fn is_closed(&self) -> bool { return self.is_set(PortStateFlag::Closed); } #[inline] pub fn is_blocked(&self) -> bool { return self.is_set(PortStateFlag::BlockedDueToPeerChange) || self.is_set(PortStateFlag::BlockedDueToFullBuffers); } #[inline] pub fn is_blocked_due_to_port_change(&self) -> bool { return self.is_set(PortStateFlag::BlockedDueToPeerChange); } // lower-level utils #[inline] pub fn set(&mut self, flag: PortStateFlag) { self.flags |= flag as u32; } #[inline] pub fn clear(&mut self, flag: PortStateFlag) { self.flags &= !(flag as u32); } #[inline] pub const fn is_set(&self, flag: PortStateFlag) -> bool { return (self.flags & (flag as u32)) != 0; } } impl Debug for PortState { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { use PortStateFlag::*; let mut s = f.debug_struct("PortState"); for (flag_name, flag_value) in &[ ("closed", Closed), ("blocked_peer_change", BlockedDueToPeerChange), ("blocked_full_buffers", BlockedDueToFullBuffers), ("transmitted", Transmitted), ] { s.field(flag_name, &self.is_set(*flag_value)); } return s.finish(); } } #[derive(Debug)] pub struct Port { // Identifiers pub self_id: PortId, pub peer_comp_id: CompId, // eventually consistent pub peer_port_id: PortId, // eventually consistent // Generic operating state pub kind: PortKind, pub state: PortState, // State tracking for error detection and error handling pub last_registered_round: Option, pub last_instruction: PortInstruction, // used during sync round to detect port-closed-during-sync errors pub close_at_sync_end: bool, // set during sync round when receiving a port-closed-after-sync message pub(crate) associated_with_peer: bool, } pub struct Peer { pub id: CompId, pub num_associated_ports: u32, pub(crate) handle: CompHandle, } /// Port and peer management structure. Will keep a local reference counter to /// the ports associate with peers, additionally manages the atomic reference /// counter associated with the peers' component handles. pub struct CompCtx { pub id: CompId, ports: Vec, peers: Vec, port_id_counter: u32, } #[derive(Copy, Clone, PartialEq, Eq)] pub struct LocalPortHandle(PortId); #[derive(Copy, Clone)] pub struct LocalPeerHandle(CompId); impl CompCtx { /// Creates a new component context based on a reserved entry in the /// component store. This reservation is used such that we already know our /// assigned ID. pub(crate) fn new(reservation: &CompReserved) -> Self { return Self{ id: reservation.id(), ports: Vec::new(), peers: Vec::new(), port_id_counter: 0, } } /// Creates a new channel that is fully owned by the component associated /// with this context. pub(crate) fn create_channel(&mut self) -> Channel { let putter_id = PortId(self.take_port_id()); let getter_id = PortId(self.take_port_id()); self.ports.push(Port{ self_id: putter_id, peer_port_id: getter_id, kind: PortKind::Putter, state: PortState::new(), peer_comp_id: self.id, last_registered_round: None, last_instruction: PortInstruction::None, close_at_sync_end: false, associated_with_peer: false, }); self.ports.push(Port{ self_id: getter_id, peer_port_id: putter_id, kind: PortKind::Getter, state: PortState::new(), peer_comp_id: self.id, last_registered_round: None, last_instruction: PortInstruction::None, close_at_sync_end: false, associated_with_peer: false, }); return Channel{ putter_id, getter_id }; } /// Adds a new port. Make sure to call `change_peer` afterwards. pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle { let self_id = PortId(self.take_port_id()); self.ports.push(Port{ self_id, peer_comp_id, peer_port_id, kind, state, last_registered_round: None, last_instruction: PortInstruction::None, close_at_sync_end: false, associated_with_peer: false, }); return LocalPortHandle(self_id); } /// Adds a self-reference. Called by the runtime/scheduler pub(crate) fn add_self_reference(&mut self, self_handle: CompHandle) { debug_assert_eq!(self.id, self_handle.id()); debug_assert!(self.get_peer_index_by_id(self.id).is_none()); self.peers.push(Peer{ id: self.id, num_associated_ports: 0, handle: self_handle }); } /// Removes a self-reference. Called by the runtime/scheduler pub(crate) fn remove_self_reference(&mut self) -> Option { let self_index = self.get_peer_index_by_id(self.id).unwrap(); let peer = &mut self.peers[self_index]; let maybe_comp_key = peer.handle.decrement_users(); self.peers.remove(self_index); return maybe_comp_key; } /// Removes a port. Make sure you called `change_peer` first. pub(crate) fn remove_port(&mut self, port_handle: LocalPortHandle) -> Port { let port_index = self.must_get_port_index(port_handle); let port = self.ports.remove(port_index); dbg_code!(assert!(!port.associated_with_peer)); return port; } /// Changes a peer pub(crate) fn change_port_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, new_peer_comp_id: Option) { // If port is currently associated with a peer, then remove that peer let port_index = self.get_port_index(port_handle); let port = &mut self.ports[port_index]; let port_is_closed = port.state.is_closed(); if port.associated_with_peer { // Remove old peer association port.associated_with_peer = false; let peer_comp_id = port.peer_comp_id; let peer_index = self.get_peer_index_by_id(peer_comp_id).unwrap(); let peer = &mut self.peers[peer_index]; peer.num_associated_ports -= 1; if peer.num_associated_ports == 0 { let mut peer = self.peers.remove(peer_index); if let Some(key) = peer.handle.decrement_users() { sched_ctx.runtime.destroy_component(key); } } } // If there is a new peer, then set it as the peer associated with the // port if let Some(peer_id) = new_peer_comp_id { let port = &mut self.ports[port_index]; port.peer_comp_id = peer_id; if peer_id != self.id && !port_is_closed { port.associated_with_peer = true; match self.get_peer_index_by_id(peer_id) { Some(index) => { let peer = &mut self.peers[index]; peer.num_associated_ports += 1; }, None => { let handle = sched_ctx.runtime.get_component_public(peer_id); self.peers.push(Peer { id: peer_id, num_associated_ports: 1, handle }) } } } } } pub(crate) fn get_port_handle(&self, port_id: PortId) -> LocalPortHandle { return LocalPortHandle(port_id); } // should perhaps be revised, used in main inbox pub(crate) fn get_port_index(&self, port_handle: LocalPortHandle) -> usize { return self.must_get_port_index(port_handle); } pub(crate) fn get_peer_handle(&self, peer_id: CompId) -> LocalPeerHandle { return LocalPeerHandle(peer_id); } pub(crate) fn get_port(&self, port_handle: LocalPortHandle) -> &Port { let index = self.must_get_port_index(port_handle); return &self.ports[index]; } pub(crate) fn get_port_mut(&mut self, port_handle: LocalPortHandle) -> &mut Port { let index = self.must_get_port_index(port_handle); return &mut self.ports[index]; } pub(crate) fn get_port_by_index_mut(&mut self, index: usize) -> &mut Port { return &mut self.ports[index]; } pub(crate) fn get_peer(&self, peer_handle: LocalPeerHandle) -> &Peer { let index = self.must_get_peer_index(peer_handle); return &self.peers[index]; } pub(crate) fn get_peer_mut(&mut self, peer_handle: LocalPeerHandle) -> &mut Peer { let index = self.must_get_peer_index(peer_handle); return &mut self.peers[index]; } #[inline] pub(crate) fn iter_ports(&self) -> impl Iterator { return self.ports.iter(); } #[inline] pub(crate) fn iter_ports_mut(&mut self) -> impl Iterator { return self.ports.iter_mut(); } #[inline] pub(crate) fn iter_peers(&self) -> impl Iterator { return self.peers.iter(); } #[inline] pub(crate) fn num_ports(&self) -> usize { return self.ports.len(); } // ------------------------------------------------------------------------- // Local utilities // ------------------------------------------------------------------------- fn must_get_port_index(&self, handle: LocalPortHandle) -> usize { for (index, port) in self.ports.iter().enumerate() { if port.self_id == handle.0 { return index; } } unreachable!() } fn must_get_peer_index(&self, handle: LocalPeerHandle) -> usize { for (index, peer) in self.peers.iter().enumerate() { if peer.id == handle.0 { return index; } } unreachable!() } fn get_peer_index_by_id(&self, comp_id: CompId) -> Option { for (index, peer) in self.peers.iter().enumerate() { if peer.id == comp_id { return Some(index); } } return None; } fn take_port_id(&mut self) -> u32 { let port_id = self.port_id_counter; self.port_id_counter = self.port_id_counter.wrapping_add(1); return port_id; } }