diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 7705f4569af80e9519147c0bce2d6a763eb7d0c6..4f5521f60edc808741ce681d6e54958fab1466c4 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -26,17 +26,15 @@ // - TODO: Write about handling messages, consensus wrapping data // - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx -use std::collections::HashMap; use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; -use crate::common::ComponentState; use crate::protocol::eval::{EvalContinuation, EvalError, Prompt, Value, ValueGroup}; -use crate::protocol::{RunContext, RunResult}; +use crate::protocol::RunContext; use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState, PreparedStatement}; use super::consensus::{Consensus, Consistency, RoundConclusion, find_ports_in_value_group}; -use super::inbox::{DataMessage, Message, SyncCompMessage, SyncCompContent, SyncPortMessage, SyncControlMessage, PublicInbox}; +use super::inbox::{DataMessage, Message, SyncCompMessage, SyncPortMessage, SyncControlMessage, PublicInbox}; use super::native::Connector; use super::port::{PortKind, PortIdLocal}; use super::scheduler::{ComponentCtx, SchedulerCtx, MessageTicket}; diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index b96ff7ac38f93de7a38a605dd8f715a2f711d899..3dcc486fd31924a0df5c3f1ae47caa13009fbb0f 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1199,7 +1199,7 @@ impl SolutionCombiner { } fn add_presence_and_check_for_global_failure(&mut self, component_id: ConnectorId, channels: &[LocalChannelPresence]) -> bool { - 'new_report_loop: for entry in channels { + for entry in channels { let mut found = false; for existing in &mut self.presence { diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index 1c952c4c4c63f30f108ca3580e208026d69be1a1..d3d822a5121b0856daf4a64968aec3ec3b579487 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -6,7 +6,6 @@ use crate::runtime2::consensus::{ComponentPresence, SolutionCombiner}; use crate::runtime2::port::ChannelId; use super::ConnectorId; -use super::branch::BranchId; use super::consensus::{GlobalSolution, LocalSolution}; use super::port::PortIdLocal; @@ -232,4 +231,9 @@ impl PublicInbox { let lock = self.messages.lock().unwrap(); return lock.is_empty(); } + + pub fn clear(&self) { + let mut lock = self.messages.lock().unwrap(); + lock.clear(); + } } \ No newline at end of file diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index cd8b2ab381c4e34b954794afae1a6d598616d5ad..060feaa2aaff8b148d35d7ab0b554120895a197e 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -29,6 +29,7 @@ use port::{ChannelId, Port, PortState}; /// A kind of token that, once obtained, allows mutable access to a connector. /// We're trying to use move semantics as much as possible: the owner of this /// key is the only one that may execute the connector's code. +#[derive(Debug)] pub(crate) struct ConnectorKey { pub index: u32, // of connector pub generation: u32, @@ -273,10 +274,98 @@ impl RuntimeInner { return (getter_port, putter_port); } - /// Sends a message to a particular connector. If the connector happened to - /// be sleeping then it will be scheduled for execution. - pub(crate) fn send_message(&self, target_id: ConnectorId, message: Message) { - let target = self.get_component_public(target_id); + /// Sends a message directly (without going through the port) to a + /// component. This is slightly less efficient then sending over a port, but + /// might be preferable for some algorithms. If the component was sleeping + /// then it is scheduled for execution. + pub(crate) fn send_message_maybe_destroyed(&self, target_id: ConnectorId, message: Message) -> bool { + let target = { + let mut lock = self.connectors.read().unwrap(); + lock.get(target_id.index) + }; + + // Do a CAS on the number of users. Most common case the component is + // alive and we're the only one sending the message. Note that if we + // finish this block, we're sure that no-one has set the `num_users` + // value to 0. This is essential! When at 0, the component is added to + // the freelist and the generation counter will be incremented. + let mut cur_num_users = 1; + while let Err(old_num_users) = target.num_users.compare_exchange(cur_num_users, cur_num_users + 1, Ordering::SeqCst, Ordering::Acquire) { + if old_num_users == 0 { + // Cannot send message. Whatever the component state is + // (destroyed, at a different generation number, busy being + // destroyed, etc.) we cannot send the message and will not + // modify the component + return false; + } + + cur_num_users = old_num_users; + } + + // We incremented the counter. But we might still be at the wrong + // generation number. The generation number is a monotonically + // increasing value. Since it only increases when someone gets the + // `num_users` counter to 0, we can simply load the generation number. + let generation = target.generation.load(Ordering::Acquire); + if generation != target_id.generation { + // We're at the wrong generation, so we cannot send the message. + // However, since we incremented the `num_users` counter, the moment + // we decrement it we might be the one that are supposed to handle + // the destruction of the component. Note that all users of the + // component do an increment-followed-by-decrement, we can simply + // do a `fetch_sub`. + let old_num_users = target.num_users.fetch_sub(1, Ordering::SeqCst); + if old_num_users == 1 { + // We're the one that got the counter to 0, so we're the ones + // that are supposed to handle component exit + self.finish_component_destruction(target_id); + } + + return false; + } + + // The generation is correct, and since we incremented the `num_users` + // counter we're now sure that we can send the message and it will be + // handled by the receiver + target.connector.public.inbox.insert_message(message); + + // Finally, do the same as above: decrement number of users, if at gets + // to 0 we're the ones who should handle the exit condition. + let old_num_users = target.num_users.fetch_sub(1, Ordering::SeqCst); + if old_num_users == 1 { + // We're allowed to destroy the component. + self.finish_component_destruction(target_id); + } else { + // Message is sent. If the component is sleeping, then we're sure + // it is not scheduled and it has not initiated the destruction of + // the component (because of the way + // `initiate_component_destruction` does not set sleeping to true). + // So we can safely schedule it. + let should_wake_up = target.connector.public.sleeping + .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_wake_up { + let key = unsafe{ ConnectorKey::from_id(target_id) }; + self.push_work(key); + } + } + + return true + } + + /// Sends a message to a particular component, assumed to occur over a port. + /// If the component happened to be sleeping then it will be scheduled for + /// execution. Because of the port management system we may assumed that + /// we're always accessing the component at the right generation number. + pub(crate) fn send_message_assumed_alive(&self, target_id: ConnectorId, message: Message) { + let target = { + let lock = self.connectors.read().unwrap(); + let entry = lock.get(target_id.index); + debug_assert_eq!(entry.generation.load(Ordering::Acquire), target_id.generation); + &mut entry.connector.public + }; + target.inbox.insert_message(message); let should_wake_up = target.sleeping @@ -316,21 +405,56 @@ impl RuntimeInner { return key; } + /// Retrieve private access to the component through its key. #[inline] pub(crate) fn get_component_private(&self, connector_key: &ConnectorKey) -> &'static mut ScheduledConnector { - let lock = self.connectors.read().unwrap(); - return lock.get_private(connector_key); + let entry = { + let lock = self.connectors.read().unwrap(); + lock.get(connector_key.index) + }; + + debug_assert_eq!(entry.generation.load(Ordering::Acquire), connector_key.generation, "private access to {:?}", connector_key); + return &mut entry.connector; } - #[inline] - pub(crate) fn get_component_public(&self, connector_id: ConnectorId) -> &'static ConnectorPublic { - let lock = self.connectors.read().unwrap(); - return lock.get_public(connector_id); + // --- Managing component destruction + + /// Start component destruction, may only be done by the scheduler that is + /// executing the component. This might not actually destroy the component, + /// since other components might be sending it messages. + fn initiate_component_destruction(&self, connector_key: ConnectorKey) { + // Most of the time no-one will be sending messages, so try + // immediate destruction + let mut lock = self.connectors.write().unwrap(); + let entry = lock.get(connector_key.index); + debug_assert_eq!(entry.generation.load(Ordering::Acquire), connector_key.generation); + debug_assert_eq!(entry.connector.public.sleeping.load(Ordering::Acquire), false); // not sleeping: caller is executing this component + let old_num_users = entry.num_users.fetch_sub(1, Ordering::SeqCst); + if old_num_users == 1 { + // We just brought the number of users down to 0. Destroy the + // component + entry.connector.public.inbox.clear(); + entry.generation.fetch_add(1, Ordering::SeqCst); + lock.destroy(connector_key); + self.decrement_active_components(); + } } - pub(crate) fn destroy_component(&self, connector_key: ConnectorKey) { + fn finish_component_destruction(&self, connector_id: ConnectorId) { let mut lock = self.connectors.write().unwrap(); - lock.destroy(connector_key); + let entry = lock.get(connector_id.index); + debug_assert_eq!(entry.num_users.load(Ordering::Acquire), 0); + let _old_generation = entry.generation.fetch_add(1, Ordering::SeqCst); + debug_assert_eq!(_old_generation, connector_id.generation); + + // TODO: In the future we should not only clear out the inbox, but send + // messages back to the senders indicating the messages did not arrive. + entry.connector.public.inbox.clear(); + + // Invariant of only one thread being able to handle the internals of + // component is preserved by the fact that only one thread can decrement + // `num_users` to 0. + lock.destroy(unsafe{ ConnectorKey::from_id(connector_id) }); self.decrement_active_components(); } @@ -396,6 +520,7 @@ unsafe impl Sync for RuntimeInner {} struct StoreEntry { connector: ScheduledConnector, generation: std::sync::atomic::AtomicU32, + num_users: std::sync::atomic::AtomicU32, } struct ConnectorStore { @@ -414,26 +539,13 @@ impl ConnectorStore { } } - /// Retrieves public part of connector - accessible by many threads at once. - fn get_public(&self, id: ConnectorId) -> &'static ConnectorPublic { - unsafe { - let entry = self.entries.get(id.index as usize); - debug_assert!(!entry.is_null()); - let cur_generation = (**entry).generation.load(Ordering::Acquire); - assert_eq!(cur_generation, id.generation, "accessing {}", id.index); - return &(**entry).connector.public; - } - } - - /// Retrieves private part of connector - accessible by one thread at a - /// time. - fn get_private(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector { + /// Directly retrieves an entry. There be dragons here. The `connector` + /// might have its destructor already executed. Accessing it might then lead + /// to memory corruption. + fn get(&self, index: u32) -> &'static mut StoreEntry { unsafe { - let entry = self.entries.get_mut(key.index as usize); - debug_assert!(!entry.is_null()); - let cur_generation = (**entry).generation.load(Ordering::Acquire); - assert_eq!(cur_generation, key.generation, "accessing {}", key.index); - return &mut (**entry).connector; + let entry = self.entries.get_mut(index as usize); + return &mut **entry; } } @@ -462,6 +574,7 @@ impl ConnectorStore { let connector = Box::into_raw(Box::new(StoreEntry{ connector, generation: AtomicU32::new(0), + num_users: AtomicU32::new(1), })); self.entries.push(connector); } else { @@ -471,6 +584,8 @@ impl ConnectorStore { unsafe { let target = &mut **self.entries.get_mut(index); std::ptr::write(&mut target.connector as *mut _, connector); + let _old_num_users = target.num_users.fetch_add(1, Ordering::SeqCst); + debug_assert_eq!(_old_num_users, 0); let generation = target.generation.load(Ordering::Acquire); key = ConnectorKey{ index: index as u32, generation }; diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index bd18c4424c96977ba644e6e430c04867dadc3b14..a7bdc3e614ad8ca78aa7ef7aee1d31e5afd455fd 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -1,13 +1,11 @@ use std::collections::VecDeque; use std::sync::{Arc, Mutex, Condvar}; -use std::sync::atomic::Ordering; -use std::collections::HashMap; use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; use crate::runtime2::consensus::RoundConclusion; -use super::{ConnectorKey, ConnectorId, RuntimeInner}; +use super::{ConnectorId, RuntimeInner}; use super::branch::{BranchId, FakeTree, QueueKind, SpeculativeState}; use super::scheduler::{SchedulerCtx, ComponentCtx, MessageTicket}; use super::port::{Port, PortIdLocal, Channel, PortKind}; @@ -534,21 +532,12 @@ impl ApplicationInterface { } fn wake_up_connector_with_ping(&self) { - let connector = self.runtime.get_component_public(self.connector_id); - connector.inbox.insert_message(Message::Control(ControlMessage { + let message = ControlMessage { id: 0, sending_component_id: self.connector_id, content: ControlContent::Ping, - })); - - let should_wake_up = connector.sleeping - .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) - .is_ok(); - - if should_wake_up { - let key = unsafe{ ConnectorKey::from_id(self.connector_id) }; - self.runtime.push_work(key); - } + }; + self.runtime.send_message_maybe_destroyed(self.connector_id, Message::Control(message)); } fn find_port_by_id(&self, port_id: PortIdLocal) -> Option { diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 4f7c83bbe328312359205cdaa20633ea9b30cca9..e4a3d4d6cefef1cf9392a792a80778a67e54a0fe 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,8 +1,7 @@ use std::collections::VecDeque; -use std::mem::MaybeUninit; use std::sync::Arc; use std::sync::atomic::Ordering; -use crate::collections::RawVec; + use crate::protocol::eval::EvalError; use crate::runtime2::port::ChannelId; @@ -12,7 +11,7 @@ use super::native::Connector; use super::branch::{BranchId}; use super::connector::{ConnectorPDL, ConnectorScheduling}; use super::inbox::{ - Message, DataMessage, SyncHeader, + Message, DataMessage, ControlMessage, ControlContent, SyncControlMessage, SyncControlContent, }; @@ -68,7 +67,7 @@ impl Scheduler { if scheduled.router.num_pending_acks() == 0 { // We're actually done, we can safely destroy the // currently running connector - self.runtime.destroy_component(connector_key); + self.runtime.initiate_component_destruction(connector_key); continue 'thread_loop; } else { cur_schedule = ConnectorScheduling::NotNow; @@ -113,7 +112,7 @@ impl Scheduler { connector_id ); self.debug_conn(connector_id, &format!("Sending message to {:?} [ exit ] \n --- {:?}", port.peer_connector, message)); - self.runtime.send_message(port.peer_connector, Message::Control(message)); + self.runtime.send_message_assumed_alive(port.peer_connector, Message::Control(message)); } } @@ -126,7 +125,7 @@ impl Scheduler { if scheduled.router.num_pending_acks() == 0 { // All ports (if any) already closed - self.runtime.destroy_component(connector_key); + self.runtime.initiate_component_destruction(connector_key); continue 'thread_loop; } @@ -152,7 +151,7 @@ impl Scheduler { // We insert directly into the private inbox. Since we have // a reroute entry the component can not yet be running. if let Message::Control(_) = &message { - self.runtime.send_message(other_component_id, message); + self.runtime.send_message_assumed_alive(other_component_id, message); } else { let key = unsafe { ConnectorKey::from_id(other_component_id) }; let component = self.runtime.get_component_private(&key); @@ -199,7 +198,7 @@ impl Scheduler { content: ControlContent::Ack, }); self.debug_conn(connector_id, &format!("Sending message to {:?} [pp ack]\n --- {:?}", message.sending_component_id, ack_message)); - self.runtime.send_message(message.sending_component_id, ack_message); + self.runtime.send_message_assumed_alive(message.sending_component_id, ack_message); }, ControlContent::CloseChannel(port_id) => { // Mark the port as being closed @@ -213,7 +212,7 @@ impl Scheduler { content: ControlContent::Ack, }); self.debug_conn(connector_id, &format!("Sending message to {:?} [cc ack] \n --- {:?}", message.sending_component_id, ack_message)); - self.runtime.send_message(message.sending_component_id, ack_message); + self.runtime.send_message_assumed_alive(message.sending_component_id, ack_message); }, ControlContent::Ack => { if let Some(component_key) = scheduled.router.handle_ack(message.id) { @@ -258,7 +257,7 @@ impl Scheduler { content: SyncControlContent::ChannelIsClosed(port.peer_id), }; self.debug_conn(scheduled.ctx.id, &format!("Sending message to {:?} [shutdown]\n --- {:?}", port.peer_connector, message)); - self.runtime.send_message(port.peer_connector, Message::SyncControl(message)); + self.runtime.send_message_assumed_alive(port.peer_connector, Message::SyncControl(message)); } } } @@ -272,7 +271,7 @@ impl Scheduler { // Handling any messages that were sent while let Some(message) = scheduled.ctx.outbox.pop_front() { - let target_component_id = match &message { + let (target_component_id, over_port) = match &message { Message::Data(content) => { // Data messages are always sent to a particular port, and // may end up being rerouted. @@ -283,14 +282,14 @@ impl Scheduler { todo!("handle sending over a closed port") } - port_desc.peer_connector + (port_desc.peer_connector, true) }, Message::SyncComp(content) => { // Sync messages are always sent to a particular component, // the sender must make sure it actually wants to send to // the specified component (and is not using an inconsistent // component ID associated with a port). - content.target_component_id + (content.target_component_id, false) }, Message::SyncPort(content) => { let port_desc = scheduled.ctx.get_port_by_id(content.source_port).unwrap(); @@ -299,14 +298,18 @@ impl Scheduler { todo!("handle sending over a closed port") } - port_desc.peer_connector + (port_desc.peer_connector, true) }, Message::SyncControl(_) => unreachable!("component sending 'SyncControl' messages directly"), Message::Control(_) => unreachable!("component sending 'Control' messages directly"), }; - self.debug_conn(connector_id, &format!("Sending message to {:?} [outbox] \n --- {:#?}", target_component_id, message)); - self.runtime.send_message(target_component_id, message); + self.debug_conn(connector_id, &format!("Sending message to {:?} [outbox, over port: {}] \n --- {:#?}", target_component_id, over_port, message)); + if over_port { + self.runtime.send_message_assumed_alive(target_component_id, message); + } else { + self.runtime.send_message_maybe_destroyed(target_component_id, message); + } } while let Some(state_change) = scheduled.ctx.state_changes.pop_front() { @@ -354,7 +357,7 @@ impl Scheduler { new_component_id, port.self_id ); self.debug_conn(connector_id, &format!("Sending message to {:?} [newcom]\n --- {:#?}", port.peer_connector, control_message)); - self.runtime.send_message(port.peer_connector, Message::Control(control_message)); + self.runtime.send_message_assumed_alive(port.peer_connector, Message::Control(control_message)); } } }, @@ -647,14 +650,11 @@ impl<'a> Iterator for MessagesIter<'a> { /// continuously re-read. Others are taken out, but may potentially be put back /// for later reading. Later reading in this case implies that they are put back /// for reading in the next sync round. +/// TODO: Again, lazy concurrency, see git history for other implementation struct Inbox { - temp_m: Vec, - temp_d: Vec, - messages: RawVec, - next_delay_idx: u32, - start_read_idx: u32, + messages: Vec, + delayed: Vec, next_read_idx: u32, - last_read_idx: u32, generation: u32, } @@ -667,188 +667,77 @@ pub(crate) struct MessageTicket { impl Inbox { fn new() -> Self { return Inbox { - temp_m: Vec::new(), temp_d: Vec::new(), - messages: RawVec::new(), - next_delay_idx: 0, - start_read_idx: 0, + messages: Vec::new(), + delayed: Vec::new(), next_read_idx: 0, - last_read_idx: 0, generation: 0, } } fn insert_new(&mut self, message: Message) { assert!(self.messages.len() < u32::MAX as usize); // TODO: @Size - self.temp_m.push(message); - return; self.messages.push(message); } fn get_next_message_ticket(&mut self) -> Option { - if self.next_read_idx as usize >= self.temp_m.len() { return None }; + if self.next_read_idx as usize >= self.messages.len() { return None }; let idx = self.next_read_idx; self.generation += 1; self.next_read_idx += 1; return Some(MessageTicket{ index: idx, generation: self.generation }); - let cur_read_idx = self.next_read_idx as usize; - if cur_read_idx >= self.messages.len() { - return None; - } - - self.generation += 1; - self.next_read_idx += 1; - return Some(MessageTicket{ - index: cur_read_idx as u32, - generation: self.generation - }); } fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message { debug_assert_eq!(self.generation, ticket.generation); - return &self.temp_m[ticket.index as usize]; - return unsafe{ &*self.messages.get(ticket.index as usize) } + return &self.messages[ticket.index as usize]; } fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message { debug_assert_eq!(self.generation, ticket.generation); debug_assert!(ticket.index < self.next_read_idx); self.next_read_idx -= 1; - return self.temp_m.remove(ticket.index as usize); - unsafe { - let take_idx = ticket.index as usize; - let val = std::ptr::read(self.messages.get(take_idx)); - - // Move messages to the right, clearing up space in the - // front. - let num_move_right = take_idx - self.start_read_idx as usize; - self.messages.move_range( - self.start_read_idx as usize, - self.start_read_idx as usize + 1, - num_move_right - ); - - self.start_read_idx += 1; - - return val; - } + return self.messages.remove(ticket.index as usize); } fn put_back_message(&mut self, message: Message) { // We have space in front of the array because we've taken out a message // before. - self.temp_d.push(message); - return; - debug_assert!(self.next_delay_idx < self.start_read_idx); - unsafe { - // Write to front of the array - std::ptr::write(self.messages.get_mut(self.next_delay_idx as usize), message); - self.next_delay_idx += 1; - } + self.delayed.push(message); } fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter { - return MessagesIter{ - messages: self.temp_m.as_slice(), - next_index: self.start_read_idx as usize, - max_index: self.next_read_idx as usize, - match_port_id - }; return MessagesIter{ messages: self.messages.as_slice(), - next_index: self.start_read_idx as usize, + next_index: 0, max_index: self.next_read_idx as usize, match_port_id }; } fn clear_read_messages(&mut self) { - self.temp_m.drain(0..self.next_read_idx as usize); - for (idx, v) in self.temp_d.drain(..).enumerate() { - self.temp_m.insert(idx, v); + self.messages.drain(0..self.next_read_idx as usize); + for (idx, v) in self.delayed.drain(..).enumerate() { + self.messages.insert(idx, v); } self.next_read_idx = 0; - return; - // Deallocate everything that was read - self.destroy_range(self.start_read_idx, self.next_read_idx); - self.generation += 1; - - // Join up all remaining values with the delayed ones in the front - let num_to_move = self.messages.len() - self.next_read_idx as usize; - self.messages.move_range( - self.next_read_idx as usize, - self.next_delay_idx as usize, - num_to_move - ); - - // Set all indices (and the RawVec len) to make sense in this new state - let new_len = self.next_delay_idx as usize + num_to_move; - self.next_delay_idx = 0; - self.start_read_idx = 0; - self.next_read_idx = 0; - self.messages.len = new_len; } fn transfer_messages_for_port(&mut self, port: PortIdLocal, new_inbox: &mut Inbox) { - debug_assert!(self.temp_d.is_empty()); + debug_assert!(self.delayed.is_empty()); let mut idx = 0; - while idx < self.temp_m.len() { - let msg = &self.temp_m[idx]; + while idx < self.messages.len() { + let msg = &self.messages[idx]; if let Some(target) = msg.target_port() { if target == port { - new_inbox.temp_m.push(self.temp_m.remove(idx)); + new_inbox.messages.push(self.messages.remove(idx)); continue; } } idx += 1; } - return; - - let mut idx = 0; - while idx < self.messages.len() { - let message = unsafe{ &*self.messages.get(idx) }; - if let Some(target_port) = message.target_port() { - if target_port == port { - // Transfer port - unsafe { - let message = std::ptr::read(message as *const _); - let remaining = self.messages.len() - idx - 1; // idx < len, due to loop condition - if remaining > 0 { - self.messages.move_range(idx + 1, idx, remaining); - } - self.messages.len -= 1; - new_inbox.insert_new(message); - } - - continue; // do not increment index - } - } - - idx += 1; - } - } - - #[inline] - fn destroy_range(&mut self, start_idx: u32, end_idx: u32) { - for idx in (start_idx as usize)..(end_idx as usize) { - unsafe { - let msg = self.messages.get_mut(idx); - std::ptr::drop_in_place(msg); - } - } } } -// -// impl Drop for Inbox { -// fn drop(&mut self) { -// // Whether in sync or not in sync. We have two ranges of allocated -// // messages: -// // - delayed messages: from 0 to `next_delay_idx` (which is 0 if in non-sync) -// // - readable messages: from `start_read_idx` to `messages.len` -// self.destroy_range(0, self.next_delay_idx); -// self.destroy_range(self.start_read_idx, self.messages.len as u32); -// } -// } // ----------------------------------------------------------------------------- // Control messages diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 3de2cced47acc7d12adcecc814e2290966161505..1f0a8423beda972cb7afcddabf978a3c57435192 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -16,7 +16,7 @@ use crate::runtime2::native::{ApplicationSyncAction}; // pub(crate) const NUM_LOOPS: u32 = 10; // number of loops within a single test (not used by all tests) pub(crate) const NUM_THREADS: u32 = 6; -pub(crate) const NUM_INSTANCES: u32 = 1; +pub(crate) const NUM_INSTANCES: u32 = 2; pub(crate) const NUM_LOOPS: u32 = 1;