diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 95bc3e85c6f4df6c9a221764f5eb8ad46745159e..c522a7397a83c029a0eabe85233498d0e844c7e0 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -1,903 +1,640 @@ -/// cbindgen:ignore -mod communication; -/// cbindgen:ignore -mod endpoints; -pub mod error; -/// cbindgen:ignore -mod logging; -/// cbindgen:ignore -mod setup; - -#[cfg(test)] -mod tests; - -use crate::common::*; -use error::*; -use mio::net::UdpSocket; - -/// The interface between the user's application and a communication session, -/// in which the application plays the part of a (native) component. This structure provides the application -/// with functionality available to all components: the ability to add new channels (port pairs), and to -/// instantiate new components whose definitions are defined in the connector's configured protocol -/// description. Native components have the additional ability to add `dangling' ports backed by local/remote -/// IP addresses, to be coupled with a counterpart once the connector's setup is completed by `connect`. -/// This allows sets of applications to cooperate in constructing shared sessions that span the network. -#[derive(Debug)] -pub struct Connector { - unphased: ConnectorUnphased, - phased: ConnectorPhased, -} +// Structure of module -/// Characterizes a type which can write lines of logging text. -/// The implementations provided in the `logging` module are likely to be sufficient, -/// but for added flexibility, users are able to implement their own loggers for use -/// by connectors. -pub trait Logger: Debug + Send + Sync { - fn line_writer(&mut self) -> Option<&mut dyn std::io::Write>; -} +mod branch; +mod native; +mod port; +mod scheduler; +mod consensus; +mod inbox; -/// A logger that appends the logged strings to a growing byte buffer -#[derive(Debug)] -pub struct VecLogger(ConnectorId, Vec); +#[cfg(test)] mod tests; +mod connector; -/// A trivial logger that always returns None, such that no logging information is ever written. -#[derive(Debug)] -pub struct DummyLogger; +// Imports -/// A logger that writes the logged lines to a given file. -#[derive(Debug)] -pub struct FileLogger(ConnectorId, std::fs::File); - -// Interface between protocol state and the connector runtime BEFORE all components -// ave begun their branching speculation. See ComponentState::nonsync_run. -pub(crate) struct NonsyncProtoContext<'a> { - ips: &'a mut IdAndPortState, - logger: &'a mut dyn Logger, - unrun_components: &'a mut Vec<(ComponentId, ComponentState)>, // lives for Nonsync phase - proto_component_id: ComponentId, // KEY in id->component map -} +use std::collections::VecDeque; +use std::sync::{Arc, Condvar, Mutex, RwLock}; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::thread::{self, JoinHandle}; -// Interface between protocol state and the connector runtime AFTER all components -// have begun their branching speculation. See ComponentState::sync_run. -pub(crate) struct SyncProtoContext<'a> { - rctx: &'a RoundCtx, - branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch - predicate: &'a Predicate, // KEY in pred->branch map -} +use crate::collections::RawVec; +use crate::ProtocolDescription; -// The data coupled with a particular protocol component branch, but crucially omitting -// the `ComponentState` such that this may be passed by reference to the state with separate -// access control. -#[derive(Default, Debug, Clone)] -struct ProtoComponentBranchInner { - did_put_or_get: HashSet, - inbox: HashMap, -} +use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling}; +use scheduler::{Scheduler, ComponentCtx, SchedulerCtx, ControlMessageHandler}; +use native::{Connector, ConnectorApplication, ApplicationInterface}; +use inbox::Message; +use port::{ChannelId, Port, PortState}; -// A speculative variable that lives for the duration of the synchronous round. -// Each is assigned a value in domain `SpecVal`. -#[derive( - Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, -)] -struct SpecVar(PortId); - -// The codomain of SpecVal. Has two associated constants for values FIRING and SILENT, -// but may also enumerate many more values to facilitate finer-grained nondeterministic branching. -#[derive( - Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, -)] -struct SpecVal(u16); - -// Data associated with a successful synchronous round, retained afterwards such that the -// native component can freely reflect on how it went, reading the messages received at their -// inputs, and reflecting on which of their connector's synchronous batches succeeded. +/// A kind of token that, once obtained, allows mutable access to a connector. +/// We're trying to use move semantics as much as possible: the owner of this +/// key is the only one that may execute the connector's code. #[derive(Debug)] -struct RoundEndedNative { - batch_index: usize, - gotten: HashMap, +pub(crate) struct ConnectorKey { + pub index: u32, // of connector + pub generation: u32, } -// Implementation of a set in terms of a vector (optimized for reading, not writing) -#[derive(Default)] -struct VecSet { - // invariant: ordered, deduplicated - vec: Vec, -} +impl ConnectorKey { + /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable + /// access, to a "regular ID" which can be used to obtain immutable access. + #[inline] + pub fn downcast(&self) -> ConnectorId { + return ConnectorId{ + index: self.index, + generation: self.generation, + }; + } -// Allows a connector to remember how to forward payloads towards the component that -// owns their destination port. `LocalComponent` corresponds with messages for components -// managed by the connector itself (hinting for it to look it up in a local structure), -// whereas the other variants direct the connector to forward the messages over the network. -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] -enum Route { - LocalComponent, - NetEndpoint { index: usize }, - UdpEndpoint { index: usize }, + /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it + /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system + #[inline] + pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey { + return ConnectorKey{ + index: id.index, + generation: id.generation, + }; + } } -// The outcome of a synchronous round, representing the distributed consensus. -// In the success case, the attached predicate encodes a row in the session's trace table. -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -enum Decision { - Failure, // some connector timed out! - Success(Predicate), +/// A kind of token that allows shared access to a connector. Multiple threads +/// may hold this +#[derive(Debug, Copy, Clone)] +pub struct ConnectorId{ + pub index: u32, + pub generation: u32, } -// The type of control messages exchanged between connectors over the network -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -enum Msg { - SetupMsg(SetupMsg), - CommMsg(CommMsg), +impl PartialEq for ConnectorId { + fn eq(&self, other: &Self) -> bool { + return self.index.eq(&other.index); + } } -// Control messages exchanged during the setup phase only -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -enum SetupMsg { - MyPortInfo(MyPortInfo), - LeaderWave { wave_leader: ConnectorId }, - LeaderAnnounce { tree_leader: ConnectorId }, - YouAreMyParent, -} +impl Eq for ConnectorId{} -// Control message particular to the communication phase. -// as such, it's annotated with a round_index -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -struct CommMsg { - round_index: usize, - contents: CommMsgContents, +impl PartialOrd for ConnectorId{ + fn partial_cmp(&self, other: &Self) -> Option { + return self.index.partial_cmp(&other.index) + } } -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -enum CommMsgContents { - SendPayload(SendPayloadMsg), - CommCtrl(CommCtrlMsg), +impl Ord for ConnectorId{ + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + return self.partial_cmp(other).unwrap(); + } } -// Connector <-> connector control messages for use in the communication phase -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -enum CommCtrlMsg { - Suggest { suggestion: Decision }, // child->parent - Announce { decision: Decision }, // parent->child -} +impl ConnectorId { + // TODO: Like the other `new_invalid`, maybe remove + #[inline] + pub fn new_invalid() -> ConnectorId { + return ConnectorId { + index: u32::MAX, + generation: 0, + }; + } -// Speculative payload message, communicating the value for the given -// port's message predecated on the given speculative variable assignments. -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -struct SendPayloadMsg { - predicate: Predicate, - payload: Payload, + #[inline] + pub(crate) fn is_valid(&self) -> bool { + return self.index != u32::MAX; + } } -// Return result of `Predicate::assignment_union`, communicating the contents -// of the predicate which represents the (consistent) union of their mappings, -// if it exists (no variable mapped distinctly by the input predicates) -#[derive(Debug, PartialEq)] -enum AssignmentUnionResult { - FormerNotLatter, - LatterNotFormer, - Equivalent, - New(Predicate), - Nonexistant, +// TODO: Change this, I hate this. But I also don't want to put `public` and +// `router` of `ScheduledConnector` back into `Connector`. The reason I don't +// want `Box` everywhere is because of the v-table overhead. But +// to truly design this properly I need some benchmarks. +pub(crate) enum ConnectorVariant { + UserDefined(ConnectorPDL), + Native(Box), } -// One of two endpoints for a control channel with a connector on either end. -// The underlying transport is TCP, so we use an inbox buffer to allow -// discrete payload receipt. -struct NetEndpoint { - inbox: Vec, - stream: TcpStream, +impl Connector for ConnectorVariant { + fn run(&mut self, scheduler_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { + match self { + ConnectorVariant::UserDefined(c) => c.run(scheduler_ctx, comp_ctx), + ConnectorVariant::Native(c) => c.run(scheduler_ctx, comp_ctx), + } + } } -// Datastructure used during the setup phase representing a NetEndpoint TO BE SETUP -#[derive(Debug, Clone)] -struct NetEndpointSetup { - getter_for_incoming: PortId, - sock_addr: SocketAddr, - endpoint_polarity: EndpointPolarity, -} +pub(crate) struct ScheduledConnector { + pub connector: ConnectorVariant, // access by connector + pub ctx: ComponentCtx, + pub public: ConnectorPublic, // accessible by all schedulers and connectors + pub router: ControlMessageHandler, + pub shutting_down: bool, +} + +// ----------------------------------------------------------------------------- +// Runtime +// ----------------------------------------------------------------------------- + +/// Externally facing runtime. +pub struct Runtime { + inner: Arc, +} + +impl Runtime { + pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime { + // Setup global state + assert!(num_threads > 0, "need a thread to run connectors"); + let runtime_inner = Arc::new(RuntimeInner{ + protocol_description, + port_counter: AtomicU32::new(0), + connectors: RwLock::new(ConnectorStore::with_capacity(32)), + connector_queue: Mutex::new(VecDeque::with_capacity(32)), + schedulers: Mutex::new(Vec::new()), + scheduler_notifier: Condvar::new(), + active_connectors: AtomicU32::new(0), + active_interfaces: AtomicU32::new(1), // this `Runtime` instance + should_exit: AtomicBool::new(false), + }); + + // Launch threads + { + let mut schedulers = Vec::with_capacity(num_threads as usize); + for thread_index in 0..num_threads { + let cloned_runtime_inner = runtime_inner.clone(); + let thread = thread::Builder::new() + .name(format!("thread-{}", thread_index)) + .spawn(move || { + let mut scheduler = Scheduler::new(cloned_runtime_inner, thread_index); + scheduler.run(); + }) + .unwrap(); + + schedulers.push(thread); + } -// Datastructure used during the setup phase representing a UdpEndpoint TO BE SETUP -#[derive(Debug, Clone)] -struct UdpEndpointSetup { - getter_for_incoming: PortId, - local_addr: SocketAddr, - peer_addr: SocketAddr, -} + let mut lock = runtime_inner.schedulers.lock().unwrap(); + *lock = schedulers; + } -// NetEndpoint annotated with the ID of the port that receives payload -// messages received through the endpoint. This approach assumes that NetEndpoints -// DO NOT multiplex port->port channels, and so a mapping such as this is possible. -// As a result, the messages themselves don't need to carry the PortID with them. -#[derive(Debug)] -struct NetEndpointExt { - net_endpoint: NetEndpoint, - getter_for_incoming: PortId, -} + // Return runtime + return Runtime{ inner: runtime_inner }; + } -// Endpoint for a "raw" UDP endpoint. Corresponds to the "Udp Mediator Component" -// described in the literature. -// It acts as an endpoint by receiving messages via the poller etc. (managed by EndpointManager), -// It acts as a native component by managing a (speculative) set of payload messages (an outbox, -// protecting the peer on the other side of the network). -#[derive(Debug)] -struct UdpEndpointExt { - sock: UdpSocket, // already bound and connected - received_this_round: bool, - outgoing_payloads: HashMap, - getter_for_incoming: PortId, -} + /// Returns a new interface through which channels and connectors can be + /// created. + pub fn create_interface(&self) -> ApplicationInterface { + self.inner.increment_active_interfaces(); + let (connector, mut interface) = ConnectorApplication::new(self.inner.clone()); + let connector_key = self.inner.create_interface_component(connector); + interface.set_connector_id(connector_key.downcast()); -// Meta-data for the connector: its role in the consensus tree. -#[derive(Debug)] -struct Neighborhood { - parent: Option, - children: VecSet, + // Note that we're not scheduling. That is done by the interface in case + // it is actually needed. + return interface; + } } -// Manages the connector's ID, and manages allocations for connector/port IDs. -#[derive(Debug, Clone)] -struct IdManager { - connector_id: ConnectorId, - port_suffix_stream: U32Stream, - component_suffix_stream: U32Stream, +impl Drop for Runtime { + fn drop(&mut self) { + self.inner.decrement_active_interfaces(); + let mut lock = self.inner.schedulers.lock().unwrap(); + for handle in lock.drain(..) { + handle.join().unwrap(); + } + } } -// Newtype wrapper around a byte buffer, used for UDP mediators to receive incoming datagrams. -struct IoByteBuffer { - byte_vec: Vec, -} +// ----------------------------------------------------------------------------- +// RuntimeInner +// ----------------------------------------------------------------------------- + +pub(crate) struct RuntimeInner { + // Protocol + pub(crate) protocol_description: ProtocolDescription, + // Regular counter for port IDs + port_counter: AtomicU32, + // Storage of connectors and the work queue + connectors: RwLock, + connector_queue: Mutex>, + schedulers: Mutex>>, + // Conditions to determine whether the runtime can exit + scheduler_notifier: Condvar, // coupled to mutex on `connector_queue`. + // TODO: Figure out if we can simply merge the counters? + active_connectors: AtomicU32, // active connectors (if sleeping, then still considered active) + active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels + should_exit: AtomicBool, +} + +impl RuntimeInner { + // --- Managing the components queued for execution + + /// Wait until there is a connector to run. If there is one, then `Some` + /// will be returned. If there is no more work, then `None` will be + /// returned. + pub(crate) fn wait_for_work(&self) -> Option { + let mut lock = self.connector_queue.lock().unwrap(); + while lock.is_empty() && !self.should_exit.load(Ordering::Acquire) { + lock = self.scheduler_notifier.wait(lock).unwrap(); + } -// A generator of speculative variables. Created on-demand during the synchronous round -// by the IdManager. -#[derive(Debug)] -struct SpecVarStream { - connector_id: ConnectorId, - port_suffix_stream: U32Stream, -} + return lock.pop_front(); + } + + pub(crate) fn push_work(&self, key: ConnectorKey) { + let mut lock = self.connector_queue.lock().unwrap(); + lock.push_back(key); + self.scheduler_notifier.notify_one(); + } + + // --- Creating/using ports + + /// Creates a new port pair. Note that these are stored globally like the + /// connectors are. Ports stored by components belong to those components. + pub(crate) fn create_channel(&self, creating_connector: ConnectorId) -> (Port, Port) { + use port::{PortIdLocal, PortKind}; + + let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst); + let channel_id = ChannelId::new(getter_id); + let putter_id = PortIdLocal::new(getter_id + 1); + let getter_id = PortIdLocal::new(getter_id); + + let getter_port = Port{ + self_id: getter_id, + peer_id: putter_id, + channel_id, + kind: PortKind::Getter, + state: PortState::Open, + peer_connector: creating_connector, + }; + let putter_port = Port{ + self_id: putter_id, + peer_id: getter_id, + channel_id, + kind: PortKind::Putter, + state: PortState::Open, + peer_connector: creating_connector, + }; + + return (getter_port, putter_port); + } + + /// Sends a message directly (without going through the port) to a + /// component. This is slightly less efficient then sending over a port, but + /// might be preferable for some algorithms. If the component was sleeping + /// then it is scheduled for execution. + pub(crate) fn send_message_maybe_destroyed(&self, target_id: ConnectorId, message: Message) -> bool { + let target = { + let mut lock = self.connectors.read().unwrap(); + lock.get(target_id.index) + }; + + // Do a CAS on the number of users. Most common case the component is + // alive and we're the only one sending the message. Note that if we + // finish this block, we're sure that no-one has set the `num_users` + // value to 0. This is essential! When at 0, the component is added to + // the freelist and the generation counter will be incremented. + let mut cur_num_users = 1; + while let Err(old_num_users) = target.num_users.compare_exchange(cur_num_users, cur_num_users + 1, Ordering::SeqCst, Ordering::Acquire) { + if old_num_users == 0 { + // Cannot send message. Whatever the component state is + // (destroyed, at a different generation number, busy being + // destroyed, etc.) we cannot send the message and will not + // modify the component + return false; + } -// Manages the messy state of the various endpoints, pollers, buffers, etc. -#[derive(Debug)] -struct EndpointManager { - // invariants: - // 1. net and udp endpoints are registered with poll with tokens computed with TargetToken::into - // 2. Events is empty - poll: Poll, - events: Events, - delayed_messages: Vec<(usize, Msg)>, - undelayed_messages: Vec<(usize, Msg)>, // ready to yield - net_endpoint_store: EndpointStore, - udp_endpoint_store: EndpointStore, - io_byte_buffer: IoByteBuffer, -} + cur_num_users = old_num_users; + } -// A storage of endpoints, which keeps track of which components have raised -// an event during poll(), signifying that they need to be checked for new incoming data -#[derive(Debug)] -struct EndpointStore { - endpoint_exts: Vec, - polled_undrained: VecSet, -} + // We incremented the counter. But we might still be at the wrong + // generation number. The generation number is a monotonically + // increasing value. Since it only increases when someone gets the + // `num_users` counter to 0, we can simply load the generation number. + let generation = target.generation.load(Ordering::Acquire); + if generation != target_id.generation { + // We're at the wrong generation, so we cannot send the message. + // However, since we incremented the `num_users` counter, the moment + // we decrement it we might be the one that are supposed to handle + // the destruction of the component. Note that all users of the + // component do an increment-followed-by-decrement, we can simply + // do a `fetch_sub`. + let old_num_users = target.num_users.fetch_sub(1, Ordering::SeqCst); + if old_num_users == 1 { + // We're the one that got the counter to 0, so we're the ones + // that are supposed to handle component exit + self.finish_component_destruction(target_id); + } -// The information associated with a port identifier, designed for local storage. -#[derive(Clone, Debug)] -struct PortInfo { - owner: ComponentId, - peer: Option, - polarity: Polarity, - route: Route, -} + return false; + } -// Similar to `PortInfo`, but designed for communication during the setup procedure. -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -struct MyPortInfo { - polarity: Polarity, - port: PortId, - owner: ComponentId, -} + // The generation is correct, and since we incremented the `num_users` + // counter we're now sure that we can send the message and it will be + // handled by the receiver + target.connector.public.inbox.insert_message(message); + + // Finally, do the same as above: decrement number of users, if at gets + // to 0 we're the ones who should handle the exit condition. + let old_num_users = target.num_users.fetch_sub(1, Ordering::SeqCst); + if old_num_users == 1 { + // We're allowed to destroy the component. + self.finish_component_destruction(target_id); + } else { + // Message is sent. If the component is sleeping, then we're sure + // it is not scheduled and it has not initiated the destruction of + // the component (because of the way + // `initiate_component_destruction` does not set sleeping to true). + // So we can safely schedule it. + let should_wake_up = target.connector.public.sleeping + .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_wake_up { + let key = unsafe{ ConnectorKey::from_id(target_id) }; + self.push_work(key); + } + } -// Newtype around port info map, allowing the implementation of some -// useful methods -#[derive(Default, Debug, Clone)] -struct PortInfoMap { - // invariant: self.invariant_preserved() - // `owned` is redundant information, allowing for fast lookup - // of a component's owned ports (which occurs during the sync round a lot) - map: HashMap, - owned: HashMap>, -} + return true + } -// A convenient substructure for containing port info and the ID manager. -// Houses the bulk of the connector's persistent state between rounds. -// It turns out several situations require access to both things. -#[derive(Debug, Clone)] -struct IdAndPortState { - port_info: PortInfoMap, - id_manager: IdManager, -} + /// Sends a message to a particular component, assumed to occur over a port. + /// If the component happened to be sleeping then it will be scheduled for + /// execution. Because of the port management system we may assumed that + /// we're always accessing the component at the right generation number. + pub(crate) fn send_message_assumed_alive(&self, target_id: ConnectorId, message: Message) { + let target = { + let lock = self.connectors.read().unwrap(); + let entry = lock.get(target_id.index); + debug_assert_eq!(entry.generation.load(Ordering::Acquire), target_id.generation); + &mut entry.connector.public + }; -// A component's setup-phase-specific data -#[derive(Debug)] -struct ConnectorCommunication { - round_index: usize, - endpoint_manager: EndpointManager, - neighborhood: Neighborhood, - native_batches: Vec, - round_result: Result, SyncError>, -} + target.inbox.insert_message(message); -// A component's data common to both setup and communication phases -#[derive(Debug)] -struct ConnectorUnphased { - proto_description: Arc, - proto_components: HashMap, - logger: Box, - ips: IdAndPortState, - native_component_id: ComponentId, -} + let should_wake_up = target.sleeping + .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); -// A connector's phase-specific data -#[derive(Debug)] -enum ConnectorPhased { - Setup(Box), - Communication(Box), -} + if should_wake_up { + let key = unsafe{ ConnectorKey::from_id(target_id) }; + self.push_work(key); + } + } -// A connector's setup-phase-specific data -#[derive(Debug)] -struct ConnectorSetup { - net_endpoint_setups: Vec, - udp_endpoint_setups: Vec, -} + // --- Creating/retrieving/destroying components -// A newtype wrapper for a map from speculative variable to speculative value -// A missing mapping corresponds with "unspecified". -#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] -struct Predicate { - assigned: BTreeMap, -} + /// Creates an initially sleeping application connector. + fn create_interface_component(&self, component: ConnectorApplication) -> ConnectorKey { + // Initialize as sleeping, as it will be scheduled by the programmer. + let mut lock = self.connectors.write().unwrap(); + let key = lock.create(ConnectorVariant::Native(Box::new(component)), true); -// Identifies a child of this connector in the _solution tree_. -// Each connector creates its own local solutions for the consensus procedure during `sync`, -// from the solutions of its children. Those children are either locally-managed components, -// (which are leaves in the solution tree), or other connectors reachable through the given -// network endpoint (which are internal nodes in the solution tree). -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] -enum SubtreeId { - LocalComponent(ComponentId), - NetEndpoint { index: usize }, -} + self.increment_active_components(); + return key; + } -// An accumulation of the connector's knowledge of all (a) the local solutions its children -// in the solution tree have found, and (b) its own solutions derivable from those of its children. -// This structure starts off each round with an empty set, and accumulates solutions as they are found -// by local components, or received over the network in control messages. -// IMPORTANT: solutions, once found, don't go away until the end of the round. That is to -// say that these sets GROW until the round is over, and all solutions are reset. -#[derive(Debug)] -struct SolutionStorage { - // invariant: old_local U new_local solutions are those that can be created from - // the UNION of one element from each set in `subtree_solution`. - // invariant is maintained by potentially populating new_local whenever subtree_solutions is populated. - old_local: HashSet, // already sent to this connector's parent OR decided - new_local: HashSet, // not yet sent to this connector's parent OR decided - // this pair acts as SubtreeId -> HashSet which is friendlier to iteration - subtree_solutions: Vec>, - subtree_id_to_index: HashMap, -} + /// Creates a new PDL component. This function just creates the component. + /// If you create it initially awake, then you must add it to the work + /// queue. Other aspects of correctness (i.e. setting initial ports) are + /// relinquished to the caller! + pub(crate) fn create_pdl_component(&self, connector: ConnectorPDL, initially_sleeping: bool) -> ConnectorKey { + // Create as not sleeping, as we'll schedule it immediately + let key = { + let mut lock = self.connectors.write().unwrap(); + lock.create(ConnectorVariant::UserDefined(connector), initially_sleeping) + }; -// Stores the transient data of a synchronous round. -// Some of it is for bookkeeping, and the rest is a temporary mirror of fields of -// `ConnectorUnphased`, such that any changes are safely contained within RoundCtx, -// and can be undone if the round fails. -struct RoundCtx { - solution_storage: SolutionStorage, - spec_var_stream: SpecVarStream, - payload_inbox: Vec<(PortId, SendPayloadMsg)>, - deadline: Option, - ips: IdAndPortState, -} + self.increment_active_components(); + return key; + } -// A trait intended to limit the access of the ConnectorUnphased structure -// such that we don't accidentally modify any important component/port data -// while the results of the round are undecided. Why? Any actions during Connector::sync -// are _speculative_ until the round is decided, and we need a safe way of rolling -// back any changes. -trait CuUndecided { - fn logger(&mut self) -> &mut dyn Logger; - fn proto_description(&self) -> &ProtocolDescription; - fn native_component_id(&self) -> ComponentId; - fn logger_and_protocol_description(&mut self) -> (&mut dyn Logger, &ProtocolDescription); - fn logger_and_protocol_components( - &mut self, - ) -> (&mut dyn Logger, &mut HashMap); -} + /// Retrieve private access to the component through its key. + #[inline] + pub(crate) fn get_component_private(&self, connector_key: &ConnectorKey) -> &'static mut ScheduledConnector { + let entry = { + let lock = self.connectors.read().unwrap(); + lock.get(connector_key.index) + }; + + debug_assert_eq!(entry.generation.load(Ordering::Acquire), connector_key.generation, "private access to {:?}", connector_key); + return &mut entry.connector; + } + + // --- Managing component destruction + + /// Start component destruction, may only be done by the scheduler that is + /// executing the component. This might not actually destroy the component, + /// since other components might be sending it messages. + fn initiate_component_destruction(&self, connector_key: ConnectorKey) { + // Most of the time no-one will be sending messages, so try + // immediate destruction + let mut lock = self.connectors.write().unwrap(); + let entry = lock.get(connector_key.index); + debug_assert_eq!(entry.generation.load(Ordering::Acquire), connector_key.generation); + debug_assert_eq!(entry.connector.public.sleeping.load(Ordering::Acquire), false); // not sleeping: caller is executing this component + let old_num_users = entry.num_users.fetch_sub(1, Ordering::SeqCst); + if old_num_users == 1 { + // We just brought the number of users down to 0. Destroy the + // component + entry.connector.public.inbox.clear(); + entry.generation.fetch_add(1, Ordering::SeqCst); + lock.destroy(connector_key); + self.decrement_active_components(); + } + } -// Represents a set of synchronous port operations that the native component -// has described as an "option" for completing during the synchronous rounds. -// Operations contained here succeed together or not at all. -// A native with N=2+ batches are expressing an N-way nondeterministic choice -#[derive(Debug, Default)] -struct NativeBatch { - // invariant: putters' and getters' polarities respected - to_put: HashMap, - to_get: HashSet, -} + fn finish_component_destruction(&self, connector_id: ConnectorId) { + let mut lock = self.connectors.write().unwrap(); + let entry = lock.get(connector_id.index); + debug_assert_eq!(entry.num_users.load(Ordering::Acquire), 0); + let _old_generation = entry.generation.fetch_add(1, Ordering::SeqCst); + debug_assert_eq!(_old_generation, connector_id.generation); -// Parallels a mio::Token type, but more clearly communicates -// the way it identifies the evented structre it corresponds to. -// See runtime/setup for methods converting between TokenTarget and mio::Token -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] -enum TokenTarget { - NetEndpoint { index: usize }, - UdpEndpoint { index: usize }, -} + // TODO: In the future we should not only clear out the inbox, but send + // messages back to the senders indicating the messages did not arrive. + entry.connector.public.inbox.clear(); -// Returned by the endpoint manager as a result of comm_recv, telling the connector what happened, -// such that it can know when to continue polling, and when to block. -enum CommRecvOk { - TimeoutWithoutNew, - NewPayloadMsgs, - NewControlMsg { net_index: usize, msg: CommCtrlMsg }, -} -//////////////// -fn err_would_block(err: &std::io::Error) -> bool { - err.kind() == std::io::ErrorKind::WouldBlock -} -impl VecSet { - fn new(mut vec: Vec) -> Self { - // establish the invariant - vec.sort(); - vec.dedup(); - Self { vec } + // Invariant of only one thread being able to handle the internals of + // component is preserved by the fact that only one thread can decrement + // `num_users` to 0. + lock.destroy(unsafe{ ConnectorKey::from_id(connector_id) }); + self.decrement_active_components(); } - fn contains(&self, element: &T) -> bool { - self.vec.binary_search(element).is_ok() - } - // Insert the given element. Returns whether it was already present. - fn insert(&mut self, element: T) -> bool { - match self.vec.binary_search(&element) { - Ok(_) => false, - Err(index) => { - self.vec.insert(index, element); - true + + // --- Managing exit condition + + #[inline] + pub(crate) fn increment_active_interfaces(&self) { + let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst); + debug_assert_ne!(_old_num, 0); // once it hits 0, it stays zero + } + + pub(crate) fn decrement_active_interfaces(&self) { + let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst); + debug_assert!(old_num > 0); + if old_num == 1 { // such that active interfaces is now 0 + let num_connectors = self.active_connectors.load(Ordering::Acquire); + if num_connectors == 0 { + self.signal_for_shutdown(); } } } - fn iter(&self) -> std::slice::Iter { - self.vec.iter() - } - fn pop(&mut self) -> Option { - self.vec.pop() - } -} -impl PortInfoMap { - fn ports_owned_by(&self, owner: ComponentId) -> impl Iterator { - self.owned.get(&owner).into_iter().flat_map(HashSet::iter) - } - fn spec_var_for(&self, port: PortId) -> SpecVar { - // Every port maps to a speculative variable - // Two distinct ports map to the same variable - // IFF they are two ends of the same logical channel. - let info = self.map.get(&port).unwrap(); - SpecVar(match info.polarity { - Getter => port, - Putter => info.peer.unwrap(), - }) + + #[inline] + fn increment_active_components(&self) { + let _old_num = self.active_connectors.fetch_add(1, Ordering::SeqCst); } - fn invariant_preserved(&self) -> bool { - // for every port P with some owner O, - // P is in O's owned set - for (port, info) in self.map.iter() { - match self.owned.get(&info.owner) { - Some(set) if set.contains(port) => {} - _ => { - println!("{:#?}\n WITH port {:?}", self, port); - return false; - } - } - } - // for every port P owned by every owner O, - // P's owner is O - for (&owner, set) in self.owned.iter() { - for port in set { - match self.map.get(port) { - Some(info) if info.owner == owner => {} - _ => { - println!("{:#?}\n WITH owner {:?} port {:?}", self, owner, port); - return false; - } - } + + fn decrement_active_components(&self) { + let old_num = self.active_connectors.fetch_sub(1, Ordering::SeqCst); + debug_assert!(old_num > 0); + if old_num == 1 { // such that we have no more active connectors (for now!) + let num_interfaces = self.active_interfaces.load(Ordering::Acquire); + if num_interfaces == 0 { + self.signal_for_shutdown(); } } - true - } -} -impl SpecVarStream { - fn next(&mut self) -> SpecVar { - let phantom_port: PortId = - Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() } - .into(); - SpecVar(phantom_port) - } -} -impl IdManager { - fn new(connector_id: ConnectorId) -> Self { - Self { - connector_id, - port_suffix_stream: Default::default(), - component_suffix_stream: Default::default(), - } - } - fn new_spec_var_stream(&self) -> SpecVarStream { - // Spec var stream starts where the current port_id stream ends, with gap of SKIP_N. - // This gap is entirely unnecessary (i.e. 0 is fine) - // It's purpose is only to make SpecVars easier to spot in logs. - // E.g. spot the spec var: { v0_0, v1_2, v1_103 } - const SKIP_N: u32 = 100; - let port_suffix_stream = self.port_suffix_stream.clone().n_skipped(SKIP_N); - SpecVarStream { connector_id: self.connector_id, port_suffix_stream } - } - fn new_port_id(&mut self) -> PortId { - Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into() - } - fn new_component_id(&mut self) -> ComponentId { - Id { connector_id: self.connector_id, u32_suffix: self.component_suffix_stream.next() } - .into() - } -} -impl Drop for Connector { - fn drop(&mut self) { - log!(self.unphased.logger(), "Connector dropping. Goodbye!"); - } -} -// Given a slice of ports, return the first, if any, port is present repeatedly -fn duplicate_port(slice: &[PortId]) -> Option { - let mut vec = Vec::with_capacity(slice.len()); - for port in slice.iter() { - match vec.binary_search(port) { - Err(index) => vec.insert(index, *port), - Ok(_) => return Some(*port), - } - } - None -} -impl Connector { - /// Generate a random connector identifier from the system's source of randomness. - pub fn random_id() -> ConnectorId { - type Bytes8 = [u8; std::mem::size_of::()]; - unsafe { - let mut bytes = std::mem::MaybeUninit::::uninit(); - // getrandom is the canonical crate for a small, secure rng - getrandom::getrandom(&mut *bytes.as_mut_ptr()).unwrap(); - // safe! representations of all valid Byte8 values are valid ConnectorId values - std::mem::transmute::<_, _>(bytes.assume_init()) - } } - /// Returns true iff the connector is in connected state, i.e., it's setup phase is complete, - /// and it is ready to participate in synchronous rounds of communication. - pub fn is_connected(&self) -> bool { - // If designed for Rust usage, connectors would be exposed as an enum type from the start. - // consequently, this "phased" business would also include connector variants and this would - // get a lot closer to the connector impl. itself. - // Instead, the C-oriented implementation doesn't distinguish connector states as types, - // and distinguish them as enum variants instead - match self.phased { - ConnectorPhased::Setup(..) => false, - ConnectorPhased::Communication(..) => true, + #[inline] + fn signal_for_shutdown(&self) { + debug_assert_eq!(self.active_interfaces.load(Ordering::Acquire), 0); + debug_assert_eq!(self.active_connectors.load(Ordering::Acquire), 0); + + let _lock = self.connector_queue.lock().unwrap(); + let should_signal = self.should_exit + .compare_exchange(false, true, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_signal { + self.scheduler_notifier.notify_all(); } } +} - /// Enables the connector's current logger to be swapped out for another - pub fn swap_logger(&mut self, mut new_logger: Box) -> Box { - std::mem::swap(&mut self.unphased.logger, &mut new_logger); - new_logger - } +unsafe impl Send for RuntimeInner {} +unsafe impl Sync for RuntimeInner {} - /// Access the connector's current logger - pub fn get_logger(&mut self) -> &mut dyn Logger { - &mut *self.unphased.logger - } +// ----------------------------------------------------------------------------- +// ConnectorStore +// ----------------------------------------------------------------------------- - /// Create a new synchronous channel, returning its ends as a pair of ports, - /// with polarity output, input respectively. Available during either setup/communication phase. - /// # Panics - /// This function panics if the connector's (large) port id space is exhausted. - pub fn new_port_pair(&mut self) -> [PortId; 2] { - let cu = &mut self.unphased; - // adds two new associated ports, related to each other, and exposed to the native - let mut new_cid = || cu.ips.id_manager.new_port_id(); - // allocate two fresh port identifiers - let [o, i] = [new_cid(), new_cid()]; - // store info for each: - // - they are each others' peers - // - they are owned by a local component with id `cid` - // - polarity putter, getter respectively - cu.ips.port_info.map.insert( - o, - PortInfo { - route: Route::LocalComponent, - peer: Some(i), - owner: cu.native_component_id, - polarity: Putter, - }, - ); - cu.ips.port_info.map.insert( - i, - PortInfo { - route: Route::LocalComponent, - peer: Some(o), - owner: cu.native_component_id, - polarity: Getter, - }, - ); - cu.ips - .port_info - .owned - .entry(cu.native_component_id) - .or_default() - .extend([o, i].iter().copied()); - - log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); - [o, i] - } +struct StoreEntry { + connector: ScheduledConnector, + generation: std::sync::atomic::AtomicU32, + num_users: std::sync::atomic::AtomicU32, +} - /// Instantiates a new component for the connector runtime to manage, and passing - /// the given set of ports from the interface of the native component, to that of the - /// newly created component (passing their ownership). - /// # Errors - /// Error is returned if the moved ports are not owned by the native component, - /// if the given component name is not defined in the connector's protocol, - /// the given sequence of ports contains a duplicate port, - /// or if the component is unfit for instantiation with the given port sequence. - /// # Panics - /// This function panics if the connector's (large) component id space is exhausted. - pub fn add_component( - &mut self, - module_name: &[u8], - identifier: &[u8], - ports: &[PortId], - ) -> Result<(), AddComponentError> { - // Check for error cases first before modifying `cu` - use AddComponentError as Ace; - let cu = &self.unphased; - if let Some(port) = duplicate_port(ports) { - return Err(Ace::DuplicatePort(port)); - } - let expected_polarities = cu.proto_description.component_polarities(module_name, identifier)?; - if expected_polarities.len() != ports.len() { - return Err(Ace::WrongNumberOfParamaters { expected: expected_polarities.len() }); - } - for (&expected_polarity, &port) in expected_polarities.iter().zip(ports.iter()) { - let info = cu.ips.port_info.map.get(&port).ok_or(Ace::UnknownPort(port))?; - if info.owner != cu.native_component_id { - return Err(Ace::UnknownPort(port)); - } - if info.polarity != expected_polarity { - return Err(Ace::WrongPortPolarity { port, expected_polarity }); - } - } - // No errors! Time to modify `cu` - // create a new component and identifier - let Connector { phased, unphased: cu } = self; - let new_cid = cu.ips.id_manager.new_component_id(); - cu.proto_components.insert(new_cid, cu.proto_description.new_component(module_name, identifier, ports)); - // update the ownership of moved ports - for port in ports.iter() { - match cu.ips.port_info.map.get_mut(port) { - Some(port_info) => port_info.owner = new_cid, - None => unreachable!(), - } - } - if let Some(set) = cu.ips.port_info.owned.get_mut(&cu.native_component_id) { - set.retain(|x| !ports.contains(x)); - } - let moved_port_set: HashSet = ports.iter().copied().collect(); - if let ConnectorPhased::Communication(comm) = phased { - // Preserve invariant: batches only reason about native's ports. - // Remove batch puts/gets for moved ports. - for batch in comm.native_batches.iter_mut() { - batch.to_put.retain(|port, _| !moved_port_set.contains(port)); - batch.to_get.retain(|port| !moved_port_set.contains(port)); - } - } - cu.ips.port_info.owned.insert(new_cid, moved_port_set); - Ok(()) - } +struct ConnectorStore { + // Freelist storage of connectors. Storage should be pointer-stable as + // someone might be mutating the vector while we're executing one of the + // connectors. + entries: RawVec<*mut StoreEntry>, + free: Vec, } -impl Predicate { - #[inline] - pub fn singleton(k: SpecVar, v: SpecVal) -> Self { - Self::default().inserted(k, v) - } - #[inline] - pub fn inserted(mut self, k: SpecVar, v: SpecVal) -> Self { - self.assigned.insert(k, v); - self - } - // Return true whether `self` is a subset of `maybe_superset` - pub fn assigns_subset(&self, maybe_superset: &Self) -> bool { - for (var, val) in self.assigned.iter() { - match maybe_superset.assigned.get(var) { - Some(val2) if val2 == val => {} - _ => return false, // var unmapped, or mapped differently - } +impl ConnectorStore { + fn with_capacity(capacity: usize) -> Self { + Self { + entries: RawVec::with_capacity(capacity), + free: Vec::with_capacity(capacity), } - // `maybe_superset` mirrored all my assignments! - true } - /// Given the two predicates {self, other}, return that whose - /// assignments are the union of those of both. - fn assignment_union(&self, other: &Self) -> AssignmentUnionResult { - use AssignmentUnionResult as Aur; - // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. - let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; - let [mut s, mut o] = [s_it.next(), o_it.next()]; - // populate lists of assignments in self but not other and vice versa. - // do this by incrementally unfolding the iterators, keeping an eye - // on the ordering between the head elements [s, o]. - // whenever s break, // both iterators are empty - [None, Some(x)] => { - // self's iterator is empty. - // all remaning elements are in other but not self - o_not_s.push(x); - o_not_s.extend(o_it); - break; - } - [Some(x), None] => { - // other's iterator is empty. - // all remaning elements are in self but not other - s_not_o.push(x); - s_not_o.extend(s_it); - break; - } - [Some((sid, sb)), Some((oid, ob))] => { - if sid < oid { - // o is missing this element - s_not_o.push((sid, sb)); - s = s_it.next(); - } else if sid > oid { - // s is missing this element - o_not_s.push((oid, ob)); - o = o_it.next(); - } else if sb != ob { - assert_eq!(sid, oid); - // both predicates assign the variable but differ on the value - // No predicate exists which satisfies both! - return Aur::Nonexistant; - } else { - // both predicates assign the variable to the same value - s = s_it.next(); - o = o_it.next(); - } - } - } - } - // Observed zero inconsistencies. A unified predicate exists... - match [s_not_o.is_empty(), o_not_s.is_empty()] { - [true, true] => Aur::Equivalent, // ... equivalent to both. - [false, true] => Aur::FormerNotLatter, // ... equivalent to self. - [true, false] => Aur::LatterNotFormer, // ... equivalent to other. - [false, false] => { - // ... which is the union of the predicates' assignments but - // is equivalent to neither self nor other. - let mut new = self.clone(); - for (&id, &b) in o_not_s { - new.assigned.insert(id, b); - } - Aur::New(new) - } + /// Directly retrieves an entry. There be dragons here. The `connector` + /// might have its destructor already executed. Accessing it might then lead + /// to memory corruption. + fn get(&self, index: u32) -> &'static mut StoreEntry { + unsafe { + let entry = self.entries.get_mut(index as usize); + return &mut **entry; } } - // Compute the union of the assignments of the two given predicates, if it exists. - // It doesn't exist if there is some value which the predicates assign to different values. - pub(crate) fn union_with(&self, other: &Self) -> Option { - let mut res = self.clone(); - for (&channel_id, &assignment_1) in other.assigned.iter() { - match res.assigned.insert(channel_id, assignment_1) { - Some(assignment_2) if assignment_1 != assignment_2 => return None, - _ => {} + /// Creates a new connector. Caller should ensure ports are set up correctly + /// and the connector is queued for execution if needed. + fn create(&mut self, connector: ConnectorVariant, initially_sleeping: bool) -> ConnectorKey { + let mut connector = ScheduledConnector { + connector, + ctx: ComponentCtx::new_empty(), + public: ConnectorPublic::new(initially_sleeping), + router: ControlMessageHandler::new(), + shutting_down: false, + }; + + let index; + let key; + + if self.free.is_empty() { + // No free entries, allocate new entry + index = self.entries.len(); + key = ConnectorKey{ + index: index as u32, generation: 0 + }; + connector.ctx.id = key.downcast(); + + let connector = Box::into_raw(Box::new(StoreEntry{ + connector, + generation: AtomicU32::new(0), + num_users: AtomicU32::new(1), + })); + self.entries.push(connector); + } else { + // Free spot available + index = self.free.pop().unwrap(); + + unsafe { + let target = &mut **self.entries.get_mut(index); + std::ptr::write(&mut target.connector as *mut _, connector); + let _old_num_users = target.num_users.fetch_add(1, Ordering::SeqCst); + debug_assert_eq!(_old_num_users, 0); + + let generation = target.generation.load(Ordering::Acquire); + key = ConnectorKey{ index: index as u32, generation }; + target.connector.ctx.id = key.downcast(); } } - Some(res) - } - pub(crate) fn query(&self, var: SpecVar) -> Option { - self.assigned.get(&var).copied() - } -} -impl RoundCtx { - // remove an arbitrary buffered message, along with the ID of the getter who receives it - fn getter_pop(&mut self) -> Option<(PortId, SendPayloadMsg)> { - self.payload_inbox.pop() + println!("DEBUG [ global store ] Created component at {}", key.index); + return key; } - // buffer a message along with the ID of the getter who receives it - fn getter_push(&mut self, getter: PortId, msg: SendPayloadMsg) { - self.payload_inbox.push((getter, msg)); - } - - // buffer a message along with the ID of the putter who sent it - fn putter_push(&mut self, cu: &mut impl CuUndecided, putter: PortId, msg: SendPayloadMsg) { - if let Some(getter) = self.ips.port_info.map.get(&putter).unwrap().peer { - log!(cu.logger(), "Putter add (putter:{:?} => getter:{:?})", putter, getter); - self.getter_push(getter, msg); - } else { - log!(cu.logger(), "Putter {:?} has no known peer!", putter); - panic!("Putter {:?} has no known peer!", putter); + /// Destroys a connector. Caller should make sure it is not scheduled for + /// execution. Otherwise one experiences "bad stuff" (tm). + fn destroy(&mut self, key: ConnectorKey) { + unsafe { + let target = self.entries.get_mut(key.index as usize); + (**target).generation.fetch_add(1, Ordering::SeqCst); + std::ptr::drop_in_place(*target); + // Note: but not deallocating! } - } -} -impl Debug for VecSet { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - f.debug_set().entries(self.vec.iter()).finish() + println!("DEBUG [ global store ] Destroyed component at {}", key.index); + self.free.push(key.index as usize); } } -impl Debug for Predicate { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - struct Assignment<'a>((&'a SpecVar, &'a SpecVal)); - impl Debug for Assignment<'_> { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "{:?}={:?}", (self.0).0, (self.0).1) + +impl Drop for ConnectorStore { + fn drop(&mut self) { + // Everything in the freelist already had its destructor called, so only + // has to be deallocated + for free_idx in self.free.iter().copied() { + unsafe { + let memory = self.entries.get_mut(free_idx); + let layout = std::alloc::Layout::for_value(&**memory); + std::alloc::dealloc(*memory as *mut u8, layout); + + // mark as null for the remainder + *memory = std::ptr::null_mut(); } } - f.debug_set().entries(self.assigned.iter().map(Assignment)).finish() - } -} -impl IdParts for SpecVar { - fn id_parts(self) -> (ConnectorId, U32Suffix) { - self.0.id_parts() - } -} -impl Debug for SpecVar { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let (a, b) = self.id_parts(); - write!(f, "v{}_{}", a, b) - } -} -impl SpecVal { - const FIRING: Self = SpecVal(1); - const SILENT: Self = SpecVal(0); - fn is_firing(self) -> bool { - self == Self::FIRING - // all else treated as SILENT - } - fn iter_domain() -> impl Iterator { - (0..).map(SpecVal) - } -} -impl Debug for SpecVal { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - self.0.fmt(f) - } -} -impl Default for IoByteBuffer { - fn default() -> Self { - let mut byte_vec = Vec::with_capacity(Self::CAPACITY); - unsafe { - // safe! this vector is guaranteed to have sufficient capacity - byte_vec.set_len(Self::CAPACITY); - } - Self { byte_vec } - } -} -impl IoByteBuffer { - const CAPACITY: usize = u16::MAX as usize + 1000; - fn as_mut_slice(&mut self) -> &mut [u8] { - self.byte_vec.as_mut_slice() - } -} -impl Debug for IoByteBuffer { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "IoByteBuffer") + // With the deallocated stuff marked as null, clear the remainder that + // is not null + for idx in 0..self.entries.len() { + unsafe { + let memory = *self.entries.get_mut(idx); + if !memory.is_null() { + let _ = Box::from_raw(memory); // take care of deallocation, bit dirty, but meh + } + } + } } -} +} \ No newline at end of file