diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs index f419c322f5e4d68297549ccc137e412065c5c568..b2c3435d5606f217b687597005aec6e3b4e3a262 100644 --- a/src/runtime2/component/component_context.rs +++ b/src/runtime2/component/component_context.rs @@ -1,15 +1,113 @@ +use std::fmt::{Debug, Formatter, Result as FmtResult}; + use crate::runtime2::scheduler::*; use crate::runtime2::runtime::*; use crate::runtime2::communication::*; +use crate::protocol::ExpressionId; + +/// Helper struct to remember when the last operation on the port took place. +#[derive(Debug, PartialEq, Copy, Clone)] +pub enum PortInstruction { + None, + NoSource, + SourceLocation(ExpressionId), +} + +/// Directionality of a port +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum PortKind { + Putter, + Getter, +} + +/// Bitflags for port +// TODO: Incorporate remaining flags from `Port` struct +#[repr(u32)] +#[derive(Debug, Copy, Clone)] +pub enum PortStateFlag { + Closed = 0x01, // If not closed, then the port is open + BlockedDueToPeerChange = 0x02, // busy changing peers, hence use of port is temporarily blocked + BlockedDueToFullBuffers = 0x04, +} + +#[derive(Copy, Clone)] +pub struct PortState { + flags: u32 +} + +impl PortState { + pub(crate) fn new() -> PortState { + return PortState{ flags: 0 } + } + + // high-level + + #[inline] + pub fn is_open(&self) -> bool { + return !self.is_closed(); + } + + #[inline] + pub fn is_closed(&self) -> bool { + return self.is_set(PortStateFlag::Closed); + } + + #[inline] + pub fn is_blocked(&self) -> bool { + return + self.is_set(PortStateFlag::BlockedDueToPeerChange) || + self.is_set(PortStateFlag::BlockedDueToFullBuffers); + } + + // lower-level utils + #[inline] + pub fn set(&mut self, flag: PortStateFlag) { + self.flags |= flag as u32; + } + + #[inline] + pub fn clear(&mut self, flag: PortStateFlag) { + self.flags &= !(flag as u32); + } + + #[inline] + pub const fn is_set(&self, flag: PortStateFlag) -> bool { + return (self.flags & (flag as u32)) != 0; + } +} + +impl Debug for PortState { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + use PortStateFlag::*; + + let mut s = f.debug_struct("PortState"); + for (flag_name, flag_value) in &[ + ("closed", Closed), + ("blocked_peer_change", BlockedDueToPeerChange), + ("blocked_full_buffers", BlockedDueToFullBuffers) + ] { + s.field(flag_name, &self.is_set(*flag_value)); + } + + return s.finish(); + } +} + #[derive(Debug)] pub struct Port { + // Identifiers pub self_id: PortId, pub peer_comp_id: CompId, // eventually consistent pub peer_port_id: PortId, // eventually consistent + // Generic operating state pub kind: PortKind, pub state: PortState, - #[cfg(debug_assertions)] pub(crate) associated_with_peer: bool, + // State tracking for error detection and error handling + pub last_instruction: PortInstruction, // used during sync round to detect port-closed-during-sync errors + pub received_message_for_sync: bool, // used during sync round to detect port-closed-before-sync errors + pub close_at_sync_end: bool, // set during sync round when receiving a port-closed-after-sync message + pub(crate) associated_with_peer: bool, } pub struct Peer { @@ -56,17 +154,23 @@ impl CompCtx { self_id: putter_id, peer_port_id: getter_id, kind: PortKind::Putter, - state: PortState::Open, + state: PortState::new(), peer_comp_id: self.id, - #[cfg(debug_assertions)] associated_with_peer: false, + last_instruction: PortInstruction::None, + close_at_sync_end: false, + received_message_for_sync: false, + associated_with_peer: false, }); self.ports.push(Port{ self_id: getter_id, peer_port_id: putter_id, kind: PortKind::Getter, - state: PortState::Open, + state: PortState::new(), peer_comp_id: self.id, - #[cfg(debug_assertions)] associated_with_peer: false, + last_instruction: PortInstruction::None, + close_at_sync_end: false, + received_message_for_sync: false, + associated_with_peer: false, }); return Channel{ putter_id, getter_id }; @@ -77,7 +181,10 @@ impl CompCtx { let self_id = PortId(self.take_port_id()); self.ports.push(Port{ self_id, peer_comp_id, peer_port_id, kind, state, - #[cfg(debug_assertions)] associated_with_peer: false, + last_instruction: PortInstruction::None, + close_at_sync_end: false, + received_message_for_sync: false, + associated_with_peer: false, }); return LocalPortHandle(self_id); } @@ -90,68 +197,55 @@ impl CompCtx { return port; } - /// Adds a new peer. This must be called for every port, no matter the - /// component the channel is connected to. If a `CompHandle` is supplied, - /// then it will be used to add the peer. Otherwise it will be retrieved - /// from the runtime using its ID. - pub(crate) fn add_peer(&mut self, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, peer_comp_id: CompId, handle: Option<&CompHandle>) { - let self_id = self.id; - let port = self.get_port_mut(port_handle); - debug_assert_eq!(port.peer_comp_id, peer_comp_id); - dbg_code!(assert!(!port.associated_with_peer)); - if !Self::requires_peer_reference(port, self_id, false) { - return; - } - - dbg_code!(port.associated_with_peer = true); - match self.get_peer_index_by_id(peer_comp_id) { - Some(peer_index) => { - let peer = &mut self.peers[peer_index]; - peer.num_associated_ports += 1; - }, - None => { - let handle = match handle { - Some(handle) => handle.clone(), - None => sched_ctx.runtime.get_component_public(peer_comp_id) - }; - self.peers.push(Peer{ - id: peer_comp_id, - num_associated_ports: 1, - handle, - }); + /// Changes a peer + pub(crate) fn change_port_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, new_peer_comp_id: Option) { + // If port is currently associated with a peer, then remove that peer + let port_index = self.get_port_index(port_handle); + let port = &mut self.ports[port_index]; + let port_is_closed = port.state.is_closed(); + if port.associated_with_peer { + // Remove old peer association + port.associated_with_peer = false; + let peer_comp_id = port.peer_comp_id; + let peer_index = self.get_peer_index_by_id(peer_comp_id).unwrap(); + let peer = &mut self.peers[peer_index]; + + peer.num_associated_ports -= 1; + if peer.num_associated_ports == 0 { + let mut peer = self.peers.remove(peer_index); + if let Some(key) = peer.handle.decrement_users() { + sched_ctx.runtime.destroy_component(key); + } } } - } - - /// Removes a peer associated with a port. - pub(crate) fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, peer_id: CompId, also_remove_if_closed: bool) { - let self_id = self.id; - let port = self.get_port_mut(port_handle); - debug_assert_eq!(port.peer_comp_id, peer_id); - if !Self::requires_peer_reference(port, self_id, also_remove_if_closed) { - return; - } - dbg_code!(assert!(port.associated_with_peer)); - dbg_code!(port.associated_with_peer = false); - let peer_index = self.get_peer_index_by_id(peer_id).unwrap(); - let peer = &mut self.peers[peer_index]; - peer.num_associated_ports -= 1; - if peer.num_associated_ports == 0 { - let mut peer = self.peers.remove(peer_index); - if let Some(key) = peer.handle.decrement_users() { - debug_assert_ne!(key.downgrade(), self.id); // should be upheld by the code that shuts down a component - sched_ctx.runtime.destroy_component(key); + // If there is a new peer, then set it as the peer associated with the + // port + if let Some(peer_id) = new_peer_comp_id { + let port = &mut self.ports[port_index]; + port.peer_comp_id = peer_id; + + if peer_id != self.id && !port_is_closed { + port.associated_with_peer = true; + + match self.get_peer_index_by_id(peer_id) { + Some(index) => { + let peer = &mut self.peers[index]; + peer.num_associated_ports += 1; + }, + None => { + let handle = sched_ctx.runtime.get_component_public(peer_id); + self.peers.push(Peer { + id: peer_id, + num_associated_ports: 1, + handle + }) + } + } } } } - pub(crate) fn set_port_state(&mut self, port_handle: LocalPortHandle, new_state: PortState) { - let port_info = self.get_port_mut(port_handle); - debug_assert_ne!(port_info.state, PortState::Closed); // because then we do not expect to change the state - port_info.state = new_state; - } - pub(crate) fn get_port_handle(&self, port_id: PortId) -> LocalPortHandle { return LocalPortHandle(port_id); } @@ -215,7 +309,7 @@ impl CompCtx { #[inline] fn requires_peer_reference(port: &Port, self_id: CompId, required_if_closed: bool) -> bool { - return (port.state != PortState::Closed || required_if_closed) && port.peer_comp_id != self_id; + return (!port.state.is_closed() || required_if_closed) && port.peer_comp_id != self_id; } fn must_get_port_index(&self, handle: LocalPortHandle) -> usize {