diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs index b2c3435d5606f217b687597005aec6e3b4e3a262..1d00c2c83fa9b0ae269fe690f659ef94cf4c9e1d 100644 --- a/src/runtime2/component/component_context.rs +++ b/src/runtime2/component/component_context.rs @@ -14,6 +14,15 @@ pub enum PortInstruction { SourceLocation(ExpressionId), } +impl PortInstruction { + pub fn is_none(&self) -> bool { + match self { + PortInstruction::None => return true, + _ => return false, + } + } +} + /// Directionality of a port #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum PortKind { @@ -29,6 +38,8 @@ pub enum PortStateFlag { Closed = 0x01, // If not closed, then the port is open BlockedDueToPeerChange = 0x02, // busy changing peers, hence use of port is temporarily blocked BlockedDueToFullBuffers = 0x04, + Transmitted = 0x08, // Transmitted, so cannot be used anymore + Received = 0x10, // Received, so cannot be used yet, only after the sync round } #[derive(Copy, Clone)] @@ -48,6 +59,14 @@ impl PortState { return !self.is_closed(); } + #[inline] + pub fn can_send(&self) -> bool { + return + !self.is_set(PortStateFlag::Closed) && + !self.is_set(PortStateFlag::Transmitted) && + !self.is_set(PortStateFlag::Received); + } + #[inline] pub fn is_closed(&self) -> bool { return self.is_set(PortStateFlag::Closed); @@ -60,6 +79,11 @@ impl PortState { self.is_set(PortStateFlag::BlockedDueToFullBuffers); } + #[inline] + pub fn is_blocked_due_to_port_change(&self) -> bool { + return self.is_set(PortStateFlag::BlockedDueToPeerChange); + } + // lower-level utils #[inline] pub fn set(&mut self, flag: PortStateFlag) { @@ -85,7 +109,8 @@ impl Debug for PortState { for (flag_name, flag_value) in &[ ("closed", Closed), ("blocked_peer_change", BlockedDueToPeerChange), - ("blocked_full_buffers", BlockedDueToFullBuffers) + ("blocked_full_buffers", BlockedDueToFullBuffers), + ("transmitted", Transmitted), ] { s.field(flag_name, &self.is_set(*flag_value)); } @@ -176,7 +201,7 @@ impl CompCtx { return Channel{ putter_id, getter_id }; } - /// Adds a new port. Make sure to call `add_peer` afterwards. + /// Adds a new port. Make sure to call `change_peer` afterwards. pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle { let self_id = PortId(self.take_port_id()); self.ports.push(Port{ @@ -189,7 +214,28 @@ impl CompCtx { return LocalPortHandle(self_id); } - /// Removes a port. Make sure you called `remove_peer` first. + /// Adds a self-reference. Called by the runtime/scheduler + pub(crate) fn add_self_reference(&mut self, self_handle: CompHandle) { + debug_assert_eq!(self.id, self_handle.id()); + debug_assert!(self.get_peer_index_by_id(self.id).is_none()); + self.peers.push(Peer{ + id: self.id, + num_associated_ports: 0, + handle: self_handle + }); + } + + /// Removes a self-reference. Called by the runtime/scheduler + pub(crate) fn remove_self_reference(&mut self) -> Option { + let self_index = self.get_peer_index_by_id(self.id).unwrap(); + let peer = &mut self.peers[self_index]; + let maybe_comp_key = peer.handle.decrement_users(); + self.peers.remove(self_index); + + return maybe_comp_key; + } + + /// Removes a port. Make sure you called `change_peer` first. pub(crate) fn remove_port(&mut self, port_handle: LocalPortHandle) -> Port { let port_index = self.must_get_port_index(port_handle); let port = self.ports.remove(port_index); @@ -307,11 +353,6 @@ impl CompCtx { // Local utilities // ------------------------------------------------------------------------- - #[inline] - fn requires_peer_reference(port: &Port, self_id: CompId, required_if_closed: bool) -> bool { - return (!port.state.is_closed() || required_if_closed) && port.peer_comp_id != self_id; - } - fn must_get_port_index(&self, handle: LocalPortHandle) -> usize { for (index, port) in self.ports.iter().enumerate() { if port.self_id == handle.0 {