diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 43574bcd840dadf478a2bd9ecd610dc7987b1709..964cb904b6e1f08be7932025bff98c05e14de662 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,6 +1,8 @@ use std::collections::VecDeque; +use std::mem::MaybeUninit; use std::sync::Arc; use std::sync::atomic::Ordering; +use crate::collections::RawVec; use crate::protocol::eval::EvalError; use crate::runtime2::port::ChannelId; @@ -137,7 +139,7 @@ impl Scheduler { // Check if the message has to be rerouted because we have moved the // target port to another component. self.debug_conn(connector_id, &format!("Handling message\n --- {:#?}", message)); - if let Some(target_port) = Self::get_message_target_port(&message) { + if let Some(target_port) = message.target_port() { if let Some(other_component_id) = scheduled.router.should_reroute(target_port) { self.debug_conn(connector_id, " ... Rerouting the message"); self.runtime.send_message(other_component_id, message); @@ -186,14 +188,17 @@ impl Scheduler { self.runtime.send_message(message.sending_component_id, ack_message); }, ControlContent::Ack => { - scheduled.router.handle_ack(message.id); + if let Some((target_component, new_control_message)) = scheduled.router.handle_ack(connector_id, message.id) { + self.debug_conn(connector_id, &format!("Sending message [ack ack] \n --- {:?}", new_control_message)); + self.runtime.send_message(target_component, new_control_message); + }; }, ControlContent::Ping => {}, } }, _ => { // All other cases have to be handled by the component - scheduled.ctx.inbox_messages.push(message); + scheduled.ctx.inbox.insert_new(message); } } } @@ -208,8 +213,10 @@ impl Scheduler { // Note: we're not handling the public inbox, we're dealing with the // private one! debug_assert!(scheduled.shutting_down); - while let Some(message) = scheduled.ctx.read_next_message_even_if_not_in_sync() { - let target_port_and_round_number = match &message { + + while let Some(ticket) = scheduled.ctx.get_next_message_ticket_even_if_not_in_sync() { + let message = scheduled.ctx.read_message_using_ticket(ticket); + let target_port_and_round_number = match message { Message::Data(msg) => Some((msg.data_header.target_port, msg.sync_header.sync_round)), Message::SyncComp(_) => None, Message::SyncPort(msg) => Some((msg.target_port, msg.sync_header.sync_round)), @@ -293,21 +300,7 @@ impl Scheduler { for port_id in initial_ports { // Transfer messages associated with the transferred port - let mut message_idx = 0; - while message_idx < scheduled.ctx.inbox_messages.len() { - let message = &scheduled.ctx.inbox_messages[message_idx]; - if Self::get_message_target_port(message) == Some(port_id) { - // Need to transfer this message - // TODO: Revise messages, this is becoming messy and error-prone - let message = scheduled.ctx.inbox_messages.remove(message_idx); - if message_idx < scheduled.ctx.inbox_len_read { - scheduled.ctx.inbox_len_read -= 1; - } - new_connector.ctx.inbox_messages.push(message); - } else { - message_idx += 1; - } - } + scheduled.ctx.inbox.transfer_messages_for_port(port_id, &mut new_connector.ctx.inbox); // Transfer the port itself let port_index = scheduled.ctx.ports.iter() @@ -322,7 +315,8 @@ impl Scheduler { if port.state == PortState::Open { let reroute_message = scheduled.router.prepare_reroute( port.self_id, port.peer_id, scheduled.ctx.id, - port.peer_connector, new_connector.ctx.id + port.peer_connector, new_connector.ctx.id, + &mut new_connector.router ); self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message)); @@ -355,10 +349,9 @@ impl Scheduler { if scheduled.ctx.is_in_sync { // Just entered sync region } else { - // Just left sync region. So clear inbox up until the last - // message that was read. - scheduled.ctx.inbox_messages.drain(0..scheduled.ctx.inbox_len_read); - scheduled.ctx.inbox_len_read = 0; + // Just left sync region. So prepare inbox for the next sync + // round + scheduled.ctx.inbox.prepare_for_next_round(); } scheduled.ctx.changed_in_sync = false; // reset flag @@ -388,25 +381,6 @@ impl Scheduler { } } - #[inline] - fn get_message_target_port(message: &Message) -> Option { - match message { - Message::Data(data) => return Some(data.data_header.target_port), - Message::SyncComp(_) => {}, - Message::SyncPort(content) => return Some(content.target_port), - Message::SyncControl(_) => return None, - Message::Control(control) => { - match &control.content { - ControlContent::PortPeerChanged(port_id, _) => return Some(*port_id), - ControlContent::CloseChannel(port_id) => return Some(*port_id), - ControlContent::Ping | ControlContent::Ack => {}, - } - }, - } - - return None - } - // TODO: Remove, this is debugging stuff fn debug(&self, message: &str) { println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); @@ -442,8 +416,7 @@ pub(crate) struct ComponentCtx { // Mostly managed by the scheduler pub(crate) id: ConnectorId, ports: Vec, - inbox_messages: Vec, - inbox_len_read: usize, + inbox: Inbox, // Submitted by the component is_in_sync: bool, changed_in_sync: bool, @@ -462,8 +435,7 @@ impl ComponentCtx { return Self{ id: ConnectorId::new_invalid(), ports: Vec::new(), - inbox_messages: Vec::new(), - inbox_len_read: 0, + inbox: Inbox::new(), is_in_sync: false, changed_in_sync: false, outbox: VecDeque::new(), @@ -570,43 +542,33 @@ impl ComponentCtx { /// those messages that have been previously received with /// `read_next_message`. pub(crate) fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter { - return MessagesIter { - messages: &self.inbox_messages, - next_index: 0, - max_index: self.inbox_len_read, - match_port_id - }; + return self.inbox.get_read_data_messages(match_port_id); } - /// Retrieves the next unread message from the inbox `None` if there are no - /// (new) messages to read. - // TODO: Fix the clone of the data message, entirely unnecessary - pub(crate) fn read_next_message(&mut self) -> Option { + pub(crate) fn get_next_message_ticket(&mut self) -> Option { if !self.is_in_sync { return None; } - return self.read_next_message_even_if_not_in_sync(); - } - - pub(crate) fn read_next_message_even_if_not_in_sync(&mut self) -> Option { - if self.inbox_len_read == self.inbox_messages.len() { return None; } - - // We want to keep data messages in the inbox, because we need to check - // them in the future. We don't want to keep sync messages around, we - // should only handle them once. Control messages should never be in - // here. - let message = &self.inbox_messages[self.inbox_len_read]; - match &message { - Message::Data(content) => { - // Keep message in inbox for later reading - self.inbox_len_read += 1; - return Some(Message::Data(content.clone())); - }, - Message::SyncComp(_) | Message::SyncPort(_) | Message::SyncControl(_) => { - // Remove message from inbox - let message = self.inbox_messages.remove(self.inbox_len_read); - return Some(message); - }, - Message::Control(_) => unreachable!("control message ended up in component inbox"), - } + return self.inbox.get_next_message_ticket(); + } + + #[inline] + pub(crate) fn get_next_message_ticket_even_if_not_in_sync(&mut self) -> Option { + return self.inbox.get_next_message_ticket(); + } + + #[inline] + pub(crate) fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message { + return self.inbox.read_message_using_ticket(ticket); + } + + #[inline] + pub(crate) fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message { + return self.inbox.take_message_using_ticket(ticket) + } + + /// Puts back a message back into the inbox. The reason being that the + /// message is actually part of the next sync round. This will + pub(crate) fn put_back_message(&mut self, message: Message) { + self.inbox.put_back_message(message); } } @@ -646,6 +608,173 @@ impl<'a> Iterator for MessagesIter<'a> { } } +// ----------------------------------------------------------------------------- +// Private Inbox +// ----------------------------------------------------------------------------- + +/// A structure that contains inbox messages. Some messages are left inside and +/// continuously re-read. Others are taken out, but may potentially be put back +/// for later reading. Later reading in this case implies that they are put back +/// for reading in the next sync round. +struct Inbox { + messages: RawVec, + next_delay_idx: u32, + start_read_idx: u32, + next_read_idx: u32, + generation: u32, +} + +#[derive(Clone, Copy)] +pub(crate) struct MessageTicket { + index: u32, + generation: u32, +} + +impl Inbox { + fn new() -> Self { + return Inbox { + messages: RawVec::new(), + next_delay_idx: 0, + start_read_idx: 0, + next_read_idx: 0, + generation: 0, + } + } + + fn insert_new(&mut self, message: Message) { + assert!(self.messages.len() < u32::MAX as usize); // TODO: @Size + self.messages.push(message); + } + + fn get_next_message_ticket(&mut self) -> Option { + let cur_read_idx = self.next_read_idx as usize; + if cur_read_idx >= self.messages.len() { + return None; + } + + self.generation += 1; + self.next_read_idx += 1; + return Some(MessageTicket{ + index: cur_read_idx as u32, + generation: self.generation + }); + } + + fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message { + debug_assert_eq!(self.generation, ticket.generation); + return unsafe{ &*self.messages.get(ticket.index as usize) } + } + + fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message { + debug_assert_eq!(self.generation, ticket.generation); + unsafe { + let take_idx = ticket.index as usize; + let val = std::ptr::read(self.messages.get(take_idx)); + + // Move messages to the right, clearing up space in the + // front. + let num_move_right = take_idx - self.start_read_idx as usize; + self.messages.move_range( + self.start_read_idx as usize, + self.start_read_idx as usize + 1, + num_move_right + ); + + self.start_read_idx += 1; + + return val; + } + } + + fn put_back_message(&mut self, message: Message) { + // We have space in front of the array because we've taken out a message + // before. + debug_assert!(self.next_delay_idx < self.start_read_idx); + unsafe { + // Write to front of the array + std::ptr::write(self.messages.get_mut(self.next_delay_idx as usize), message); + self.next_delay_idx += 1; + } + } + + fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter { + return MessagesIter{ + messages: self.messages.as_slice(), + next_index: self.start_read_idx as usize, + max_index: self.next_read_idx as usize, + match_port_id + }; + } + + fn prepare_for_next_round(&mut self) { + // Deallocate everything that was read + self.destroy_range(self.start_read_idx, self.next_read_idx); + self.generation += 1; + + // Join up all remaining values with the delayed ones in the front + let num_to_move = self.messages.len() - self.next_read_idx as usize; + self.messages.move_range( + self.next_read_idx as usize, + self.next_delay_idx as usize, + num_to_move + ); + + // Set all indices (and the RawVec len) to make sense in this new state + let new_len = self.next_delay_idx as usize + num_to_move; + self.next_delay_idx = 0; + self.start_read_idx = 0; + self.next_read_idx = 0; + self.messages.len = new_len; + } + + fn transfer_messages_for_port(&mut self, port: PortIdLocal, new_inbox: &mut Inbox) { + // Convoluted assert to make sure we're in non-sync mode, as that is + // when this is called, and that makes our lives easier + let mut idx = 0; + while idx < self.messages.len() { + let message = unsafe{ &*self.messages.get(idx) }; + if let Some(target_port) = message.target_port() { + if target_port == port { + // Transfer port + unsafe { + let message = std::ptr::read(message as *const _); + let remaining = self.messages.len() - idx; + if remaining > 1 { + self.messages.move_range(idx + 1, idx, remaining - 1); + } + self.messages.len -= 1; + new_inbox.insert_new(message); + } + } else { + // Do not transfer port + idx += 1; + } + } + } + } + + #[inline] + fn destroy_range(&mut self, start_idx: u32, end_idx: u32) { + for idx in (start_idx as usize)..(end_idx as usize) { + unsafe { + let msg = self.messages.get_mut(idx); + std::ptr::drop_in_place(msg); + } + } + } +} + +impl Drop for Inbox { + fn drop(&mut self) { + // Whether in sync or not in sync. We have two ranges of allocated + // messages: + // - delayed messages: from 0 to `next_delay_idx` (which is 0 if in non-sync) + // - readable messages: from `start_read_idx` to `messages.len` + self.destroy_range(0, self.next_delay_idx); + self.destroy_range(self.start_read_idx, self.messages.len as u32); + } +} + // ----------------------------------------------------------------------------- // Control messages // ----------------------------------------------------------------------------- @@ -658,12 +787,14 @@ struct ControlEntry { enum ControlVariant { ChangedPort(ControlChangedPort), ClosedChannel(ControlClosedChannel), + ReroutePending, } struct ControlChangedPort { target_port: PortIdLocal, // if send to this port, then reroute source_connector: ConnectorId, // connector we expect messages from target_connector: ConnectorId, // connector we need to reroute to + id_of_ack_after_confirmation: u32, // control message ID we need to send to the target upon receiving an ack } struct ControlClosedChannel { @@ -714,19 +845,26 @@ impl ControlMessageHandler { &mut self, port_id: PortIdLocal, peer_port_id: PortIdLocal, self_connector_id: ConnectorId, peer_connector_id: ConnectorId, - new_owner_connector_id: ConnectorId + new_owner_connector_id: ConnectorId, new_owner_ctrl_handler: &mut ControlMessageHandler, ) -> ControlMessage { let id = self.take_id(); + let new_owner_id = new_owner_ctrl_handler.take_id(); self.active.push(ControlEntry{ id, variant: ControlVariant::ChangedPort(ControlChangedPort{ target_port: port_id, source_connector: peer_connector_id, target_connector: new_owner_connector_id, + id_of_ack_after_confirmation: new_owner_id, }), }); + new_owner_ctrl_handler.active.push(ControlEntry{ + id: new_owner_id, + variant: ControlVariant::ReroutePending, + }); + return ControlMessage { id, sending_component_id: self_connector_id, @@ -749,14 +887,33 @@ impl ControlMessageHandler { return None; } - /// Handles an Ack as an answer to a previously sent control message - pub fn handle_ack(&mut self, id: u32) { + /// Handles an Ack as an answer to a previously sent control message. + /// Handling an Ack might spawn a new message that needs to be sent. + pub fn handle_ack(&mut self, handler_component_id: ConnectorId, id: u32) -> Option<(ConnectorId, Message)> { let index = self.active.iter() .position(|v| v.id == id); match index { - Some(index) => { self.active.remove(index); }, - None => { todo!("handling of nefarious ACKs"); }, + Some(index) => { + let removed = self.active.remove(index); + match removed.variant { + ControlVariant::ChangedPort(message) => { + return Some(( + message.target_connector, + Message::Control(ControlMessage{ + id: message.id_of_ack_after_confirmation, + sending_component_id: handler_component_id, + content: ControlContent::Ack + }) + )); + }, + _ => return None, + } + }, + None => { + todo!("handling of nefarious ACKs"); + return None; + }, } }