diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 4f7c83bbe328312359205cdaa20633ea9b30cca9..e4a3d4d6cefef1cf9392a792a80778a67e54a0fe 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,8 +1,7 @@ use std::collections::VecDeque; -use std::mem::MaybeUninit; use std::sync::Arc; use std::sync::atomic::Ordering; -use crate::collections::RawVec; + use crate::protocol::eval::EvalError; use crate::runtime2::port::ChannelId; @@ -12,7 +11,7 @@ use super::native::Connector; use super::branch::{BranchId}; use super::connector::{ConnectorPDL, ConnectorScheduling}; use super::inbox::{ - Message, DataMessage, SyncHeader, + Message, DataMessage, ControlMessage, ControlContent, SyncControlMessage, SyncControlContent, }; @@ -68,7 +67,7 @@ impl Scheduler { if scheduled.router.num_pending_acks() == 0 { // We're actually done, we can safely destroy the // currently running connector - self.runtime.destroy_component(connector_key); + self.runtime.initiate_component_destruction(connector_key); continue 'thread_loop; } else { cur_schedule = ConnectorScheduling::NotNow; @@ -113,7 +112,7 @@ impl Scheduler { connector_id ); self.debug_conn(connector_id, &format!("Sending message to {:?} [ exit ] \n --- {:?}", port.peer_connector, message)); - self.runtime.send_message(port.peer_connector, Message::Control(message)); + self.runtime.send_message_assumed_alive(port.peer_connector, Message::Control(message)); } } @@ -126,7 +125,7 @@ impl Scheduler { if scheduled.router.num_pending_acks() == 0 { // All ports (if any) already closed - self.runtime.destroy_component(connector_key); + self.runtime.initiate_component_destruction(connector_key); continue 'thread_loop; } @@ -152,7 +151,7 @@ impl Scheduler { // We insert directly into the private inbox. Since we have // a reroute entry the component can not yet be running. if let Message::Control(_) = &message { - self.runtime.send_message(other_component_id, message); + self.runtime.send_message_assumed_alive(other_component_id, message); } else { let key = unsafe { ConnectorKey::from_id(other_component_id) }; let component = self.runtime.get_component_private(&key); @@ -199,7 +198,7 @@ impl Scheduler { content: ControlContent::Ack, }); self.debug_conn(connector_id, &format!("Sending message to {:?} [pp ack]\n --- {:?}", message.sending_component_id, ack_message)); - self.runtime.send_message(message.sending_component_id, ack_message); + self.runtime.send_message_assumed_alive(message.sending_component_id, ack_message); }, ControlContent::CloseChannel(port_id) => { // Mark the port as being closed @@ -213,7 +212,7 @@ impl Scheduler { content: ControlContent::Ack, }); self.debug_conn(connector_id, &format!("Sending message to {:?} [cc ack] \n --- {:?}", message.sending_component_id, ack_message)); - self.runtime.send_message(message.sending_component_id, ack_message); + self.runtime.send_message_assumed_alive(message.sending_component_id, ack_message); }, ControlContent::Ack => { if let Some(component_key) = scheduled.router.handle_ack(message.id) { @@ -258,7 +257,7 @@ impl Scheduler { content: SyncControlContent::ChannelIsClosed(port.peer_id), }; self.debug_conn(scheduled.ctx.id, &format!("Sending message to {:?} [shutdown]\n --- {:?}", port.peer_connector, message)); - self.runtime.send_message(port.peer_connector, Message::SyncControl(message)); + self.runtime.send_message_assumed_alive(port.peer_connector, Message::SyncControl(message)); } } } @@ -272,7 +271,7 @@ impl Scheduler { // Handling any messages that were sent while let Some(message) = scheduled.ctx.outbox.pop_front() { - let target_component_id = match &message { + let (target_component_id, over_port) = match &message { Message::Data(content) => { // Data messages are always sent to a particular port, and // may end up being rerouted. @@ -283,14 +282,14 @@ impl Scheduler { todo!("handle sending over a closed port") } - port_desc.peer_connector + (port_desc.peer_connector, true) }, Message::SyncComp(content) => { // Sync messages are always sent to a particular component, // the sender must make sure it actually wants to send to // the specified component (and is not using an inconsistent // component ID associated with a port). - content.target_component_id + (content.target_component_id, false) }, Message::SyncPort(content) => { let port_desc = scheduled.ctx.get_port_by_id(content.source_port).unwrap(); @@ -299,14 +298,18 @@ impl Scheduler { todo!("handle sending over a closed port") } - port_desc.peer_connector + (port_desc.peer_connector, true) }, Message::SyncControl(_) => unreachable!("component sending 'SyncControl' messages directly"), Message::Control(_) => unreachable!("component sending 'Control' messages directly"), }; - self.debug_conn(connector_id, &format!("Sending message to {:?} [outbox] \n --- {:#?}", target_component_id, message)); - self.runtime.send_message(target_component_id, message); + self.debug_conn(connector_id, &format!("Sending message to {:?} [outbox, over port: {}] \n --- {:#?}", target_component_id, over_port, message)); + if over_port { + self.runtime.send_message_assumed_alive(target_component_id, message); + } else { + self.runtime.send_message_maybe_destroyed(target_component_id, message); + } } while let Some(state_change) = scheduled.ctx.state_changes.pop_front() { @@ -354,7 +357,7 @@ impl Scheduler { new_component_id, port.self_id ); self.debug_conn(connector_id, &format!("Sending message to {:?} [newcom]\n --- {:#?}", port.peer_connector, control_message)); - self.runtime.send_message(port.peer_connector, Message::Control(control_message)); + self.runtime.send_message_assumed_alive(port.peer_connector, Message::Control(control_message)); } } }, @@ -647,14 +650,11 @@ impl<'a> Iterator for MessagesIter<'a> { /// continuously re-read. Others are taken out, but may potentially be put back /// for later reading. Later reading in this case implies that they are put back /// for reading in the next sync round. +/// TODO: Again, lazy concurrency, see git history for other implementation struct Inbox { - temp_m: Vec, - temp_d: Vec, - messages: RawVec, - next_delay_idx: u32, - start_read_idx: u32, + messages: Vec, + delayed: Vec, next_read_idx: u32, - last_read_idx: u32, generation: u32, } @@ -667,188 +667,77 @@ pub(crate) struct MessageTicket { impl Inbox { fn new() -> Self { return Inbox { - temp_m: Vec::new(), temp_d: Vec::new(), - messages: RawVec::new(), - next_delay_idx: 0, - start_read_idx: 0, + messages: Vec::new(), + delayed: Vec::new(), next_read_idx: 0, - last_read_idx: 0, generation: 0, } } fn insert_new(&mut self, message: Message) { assert!(self.messages.len() < u32::MAX as usize); // TODO: @Size - self.temp_m.push(message); - return; self.messages.push(message); } fn get_next_message_ticket(&mut self) -> Option { - if self.next_read_idx as usize >= self.temp_m.len() { return None }; + if self.next_read_idx as usize >= self.messages.len() { return None }; let idx = self.next_read_idx; self.generation += 1; self.next_read_idx += 1; return Some(MessageTicket{ index: idx, generation: self.generation }); - let cur_read_idx = self.next_read_idx as usize; - if cur_read_idx >= self.messages.len() { - return None; - } - - self.generation += 1; - self.next_read_idx += 1; - return Some(MessageTicket{ - index: cur_read_idx as u32, - generation: self.generation - }); } fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message { debug_assert_eq!(self.generation, ticket.generation); - return &self.temp_m[ticket.index as usize]; - return unsafe{ &*self.messages.get(ticket.index as usize) } + return &self.messages[ticket.index as usize]; } fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message { debug_assert_eq!(self.generation, ticket.generation); debug_assert!(ticket.index < self.next_read_idx); self.next_read_idx -= 1; - return self.temp_m.remove(ticket.index as usize); - unsafe { - let take_idx = ticket.index as usize; - let val = std::ptr::read(self.messages.get(take_idx)); - - // Move messages to the right, clearing up space in the - // front. - let num_move_right = take_idx - self.start_read_idx as usize; - self.messages.move_range( - self.start_read_idx as usize, - self.start_read_idx as usize + 1, - num_move_right - ); - - self.start_read_idx += 1; - - return val; - } + return self.messages.remove(ticket.index as usize); } fn put_back_message(&mut self, message: Message) { // We have space in front of the array because we've taken out a message // before. - self.temp_d.push(message); - return; - debug_assert!(self.next_delay_idx < self.start_read_idx); - unsafe { - // Write to front of the array - std::ptr::write(self.messages.get_mut(self.next_delay_idx as usize), message); - self.next_delay_idx += 1; - } + self.delayed.push(message); } fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter { - return MessagesIter{ - messages: self.temp_m.as_slice(), - next_index: self.start_read_idx as usize, - max_index: self.next_read_idx as usize, - match_port_id - }; return MessagesIter{ messages: self.messages.as_slice(), - next_index: self.start_read_idx as usize, + next_index: 0, max_index: self.next_read_idx as usize, match_port_id }; } fn clear_read_messages(&mut self) { - self.temp_m.drain(0..self.next_read_idx as usize); - for (idx, v) in self.temp_d.drain(..).enumerate() { - self.temp_m.insert(idx, v); + self.messages.drain(0..self.next_read_idx as usize); + for (idx, v) in self.delayed.drain(..).enumerate() { + self.messages.insert(idx, v); } self.next_read_idx = 0; - return; - // Deallocate everything that was read - self.destroy_range(self.start_read_idx, self.next_read_idx); - self.generation += 1; - - // Join up all remaining values with the delayed ones in the front - let num_to_move = self.messages.len() - self.next_read_idx as usize; - self.messages.move_range( - self.next_read_idx as usize, - self.next_delay_idx as usize, - num_to_move - ); - - // Set all indices (and the RawVec len) to make sense in this new state - let new_len = self.next_delay_idx as usize + num_to_move; - self.next_delay_idx = 0; - self.start_read_idx = 0; - self.next_read_idx = 0; - self.messages.len = new_len; } fn transfer_messages_for_port(&mut self, port: PortIdLocal, new_inbox: &mut Inbox) { - debug_assert!(self.temp_d.is_empty()); + debug_assert!(self.delayed.is_empty()); let mut idx = 0; - while idx < self.temp_m.len() { - let msg = &self.temp_m[idx]; + while idx < self.messages.len() { + let msg = &self.messages[idx]; if let Some(target) = msg.target_port() { if target == port { - new_inbox.temp_m.push(self.temp_m.remove(idx)); + new_inbox.messages.push(self.messages.remove(idx)); continue; } } idx += 1; } - return; - - let mut idx = 0; - while idx < self.messages.len() { - let message = unsafe{ &*self.messages.get(idx) }; - if let Some(target_port) = message.target_port() { - if target_port == port { - // Transfer port - unsafe { - let message = std::ptr::read(message as *const _); - let remaining = self.messages.len() - idx - 1; // idx < len, due to loop condition - if remaining > 0 { - self.messages.move_range(idx + 1, idx, remaining); - } - self.messages.len -= 1; - new_inbox.insert_new(message); - } - - continue; // do not increment index - } - } - - idx += 1; - } - } - - #[inline] - fn destroy_range(&mut self, start_idx: u32, end_idx: u32) { - for idx in (start_idx as usize)..(end_idx as usize) { - unsafe { - let msg = self.messages.get_mut(idx); - std::ptr::drop_in_place(msg); - } - } } } -// -// impl Drop for Inbox { -// fn drop(&mut self) { -// // Whether in sync or not in sync. We have two ranges of allocated -// // messages: -// // - delayed messages: from 0 to `next_delay_idx` (which is 0 if in non-sync) -// // - readable messages: from `start_read_idx` to `messages.len` -// self.destroy_range(0, self.next_delay_idx); -// self.destroy_range(self.start_read_idx, self.messages.len as u32); -// } -// } // ----------------------------------------------------------------------------- // Control messages