diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index cd8b2ab381c4e34b954794afae1a6d598616d5ad..060feaa2aaff8b148d35d7ab0b554120895a197e 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -29,6 +29,7 @@ use port::{ChannelId, Port, PortState}; /// A kind of token that, once obtained, allows mutable access to a connector. /// We're trying to use move semantics as much as possible: the owner of this /// key is the only one that may execute the connector's code. +#[derive(Debug)] pub(crate) struct ConnectorKey { pub index: u32, // of connector pub generation: u32, @@ -273,10 +274,98 @@ impl RuntimeInner { return (getter_port, putter_port); } - /// Sends a message to a particular connector. If the connector happened to - /// be sleeping then it will be scheduled for execution. - pub(crate) fn send_message(&self, target_id: ConnectorId, message: Message) { - let target = self.get_component_public(target_id); + /// Sends a message directly (without going through the port) to a + /// component. This is slightly less efficient then sending over a port, but + /// might be preferable for some algorithms. If the component was sleeping + /// then it is scheduled for execution. + pub(crate) fn send_message_maybe_destroyed(&self, target_id: ConnectorId, message: Message) -> bool { + let target = { + let mut lock = self.connectors.read().unwrap(); + lock.get(target_id.index) + }; + + // Do a CAS on the number of users. Most common case the component is + // alive and we're the only one sending the message. Note that if we + // finish this block, we're sure that no-one has set the `num_users` + // value to 0. This is essential! When at 0, the component is added to + // the freelist and the generation counter will be incremented. + let mut cur_num_users = 1; + while let Err(old_num_users) = target.num_users.compare_exchange(cur_num_users, cur_num_users + 1, Ordering::SeqCst, Ordering::Acquire) { + if old_num_users == 0 { + // Cannot send message. Whatever the component state is + // (destroyed, at a different generation number, busy being + // destroyed, etc.) we cannot send the message and will not + // modify the component + return false; + } + + cur_num_users = old_num_users; + } + + // We incremented the counter. But we might still be at the wrong + // generation number. The generation number is a monotonically + // increasing value. Since it only increases when someone gets the + // `num_users` counter to 0, we can simply load the generation number. + let generation = target.generation.load(Ordering::Acquire); + if generation != target_id.generation { + // We're at the wrong generation, so we cannot send the message. + // However, since we incremented the `num_users` counter, the moment + // we decrement it we might be the one that are supposed to handle + // the destruction of the component. Note that all users of the + // component do an increment-followed-by-decrement, we can simply + // do a `fetch_sub`. + let old_num_users = target.num_users.fetch_sub(1, Ordering::SeqCst); + if old_num_users == 1 { + // We're the one that got the counter to 0, so we're the ones + // that are supposed to handle component exit + self.finish_component_destruction(target_id); + } + + return false; + } + + // The generation is correct, and since we incremented the `num_users` + // counter we're now sure that we can send the message and it will be + // handled by the receiver + target.connector.public.inbox.insert_message(message); + + // Finally, do the same as above: decrement number of users, if at gets + // to 0 we're the ones who should handle the exit condition. + let old_num_users = target.num_users.fetch_sub(1, Ordering::SeqCst); + if old_num_users == 1 { + // We're allowed to destroy the component. + self.finish_component_destruction(target_id); + } else { + // Message is sent. If the component is sleeping, then we're sure + // it is not scheduled and it has not initiated the destruction of + // the component (because of the way + // `initiate_component_destruction` does not set sleeping to true). + // So we can safely schedule it. + let should_wake_up = target.connector.public.sleeping + .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_wake_up { + let key = unsafe{ ConnectorKey::from_id(target_id) }; + self.push_work(key); + } + } + + return true + } + + /// Sends a message to a particular component, assumed to occur over a port. + /// If the component happened to be sleeping then it will be scheduled for + /// execution. Because of the port management system we may assumed that + /// we're always accessing the component at the right generation number. + pub(crate) fn send_message_assumed_alive(&self, target_id: ConnectorId, message: Message) { + let target = { + let lock = self.connectors.read().unwrap(); + let entry = lock.get(target_id.index); + debug_assert_eq!(entry.generation.load(Ordering::Acquire), target_id.generation); + &mut entry.connector.public + }; + target.inbox.insert_message(message); let should_wake_up = target.sleeping @@ -316,21 +405,56 @@ impl RuntimeInner { return key; } + /// Retrieve private access to the component through its key. #[inline] pub(crate) fn get_component_private(&self, connector_key: &ConnectorKey) -> &'static mut ScheduledConnector { - let lock = self.connectors.read().unwrap(); - return lock.get_private(connector_key); + let entry = { + let lock = self.connectors.read().unwrap(); + lock.get(connector_key.index) + }; + + debug_assert_eq!(entry.generation.load(Ordering::Acquire), connector_key.generation, "private access to {:?}", connector_key); + return &mut entry.connector; } - #[inline] - pub(crate) fn get_component_public(&self, connector_id: ConnectorId) -> &'static ConnectorPublic { - let lock = self.connectors.read().unwrap(); - return lock.get_public(connector_id); + // --- Managing component destruction + + /// Start component destruction, may only be done by the scheduler that is + /// executing the component. This might not actually destroy the component, + /// since other components might be sending it messages. + fn initiate_component_destruction(&self, connector_key: ConnectorKey) { + // Most of the time no-one will be sending messages, so try + // immediate destruction + let mut lock = self.connectors.write().unwrap(); + let entry = lock.get(connector_key.index); + debug_assert_eq!(entry.generation.load(Ordering::Acquire), connector_key.generation); + debug_assert_eq!(entry.connector.public.sleeping.load(Ordering::Acquire), false); // not sleeping: caller is executing this component + let old_num_users = entry.num_users.fetch_sub(1, Ordering::SeqCst); + if old_num_users == 1 { + // We just brought the number of users down to 0. Destroy the + // component + entry.connector.public.inbox.clear(); + entry.generation.fetch_add(1, Ordering::SeqCst); + lock.destroy(connector_key); + self.decrement_active_components(); + } } - pub(crate) fn destroy_component(&self, connector_key: ConnectorKey) { + fn finish_component_destruction(&self, connector_id: ConnectorId) { let mut lock = self.connectors.write().unwrap(); - lock.destroy(connector_key); + let entry = lock.get(connector_id.index); + debug_assert_eq!(entry.num_users.load(Ordering::Acquire), 0); + let _old_generation = entry.generation.fetch_add(1, Ordering::SeqCst); + debug_assert_eq!(_old_generation, connector_id.generation); + + // TODO: In the future we should not only clear out the inbox, but send + // messages back to the senders indicating the messages did not arrive. + entry.connector.public.inbox.clear(); + + // Invariant of only one thread being able to handle the internals of + // component is preserved by the fact that only one thread can decrement + // `num_users` to 0. + lock.destroy(unsafe{ ConnectorKey::from_id(connector_id) }); self.decrement_active_components(); } @@ -396,6 +520,7 @@ unsafe impl Sync for RuntimeInner {} struct StoreEntry { connector: ScheduledConnector, generation: std::sync::atomic::AtomicU32, + num_users: std::sync::atomic::AtomicU32, } struct ConnectorStore { @@ -414,26 +539,13 @@ impl ConnectorStore { } } - /// Retrieves public part of connector - accessible by many threads at once. - fn get_public(&self, id: ConnectorId) -> &'static ConnectorPublic { - unsafe { - let entry = self.entries.get(id.index as usize); - debug_assert!(!entry.is_null()); - let cur_generation = (**entry).generation.load(Ordering::Acquire); - assert_eq!(cur_generation, id.generation, "accessing {}", id.index); - return &(**entry).connector.public; - } - } - - /// Retrieves private part of connector - accessible by one thread at a - /// time. - fn get_private(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector { + /// Directly retrieves an entry. There be dragons here. The `connector` + /// might have its destructor already executed. Accessing it might then lead + /// to memory corruption. + fn get(&self, index: u32) -> &'static mut StoreEntry { unsafe { - let entry = self.entries.get_mut(key.index as usize); - debug_assert!(!entry.is_null()); - let cur_generation = (**entry).generation.load(Ordering::Acquire); - assert_eq!(cur_generation, key.generation, "accessing {}", key.index); - return &mut (**entry).connector; + let entry = self.entries.get_mut(index as usize); + return &mut **entry; } } @@ -462,6 +574,7 @@ impl ConnectorStore { let connector = Box::into_raw(Box::new(StoreEntry{ connector, generation: AtomicU32::new(0), + num_users: AtomicU32::new(1), })); self.entries.push(connector); } else { @@ -471,6 +584,8 @@ impl ConnectorStore { unsafe { let target = &mut **self.entries.get_mut(index); std::ptr::write(&mut target.connector as *mut _, connector); + let _old_num_users = target.num_users.fetch_add(1, Ordering::SeqCst); + debug_assert_eq!(_old_num_users, 0); let generation = target.generation.load(Ordering::Acquire); key = ConnectorKey{ index: index as u32, generation };