diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 964cb904b6e1f08be7932025bff98c05e14de662..a06f30aa2175ec773f37e8da56cd7fe2094a4017 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -65,7 +65,6 @@ impl Scheduler { // Nothing to do. But we're stil waiting for all our pending // control messages to be answered. self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks())); - self.handle_inbox_while_shutting_down(scheduled); if scheduled.router.num_pending_acks() == 0 { // We're actually done, we can safely destroy the // currently running connector @@ -118,6 +117,13 @@ impl Scheduler { } } + // Any messages still in the public inbox should be handled + scheduled.ctx.inbox.clear_read_messages(); + while let Some(ticket) = scheduled.ctx.get_next_message_ticket_even_if_not_in_sync() { + let message = scheduled.ctx.take_message_using_ticket(ticket); + self.handle_message_while_shutting_down(message, scheduled); + } + if scheduled.router.num_pending_acks() == 0 { // All ports (if any) already closed self.runtime.destroy_component(connector_key); @@ -149,95 +155,86 @@ impl Scheduler { // If here, then we should handle the message self.debug_conn(connector_id, " ... Handling the message"); - - match message { - Message::Control(message) => { - match message.content { - ControlContent::PortPeerChanged(port_id, new_target_connector_id) => { - // Need to change port target - let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap(); - port.peer_connector = new_target_connector_id; - - // Note: for simplicity we program the scheduler to always finish - // running a connector with an empty outbox. If this ever changes - // then accepting the "port peer changed" message implies we need - // to change the recipient of the message in the outbox. - debug_assert!(scheduled.ctx.outbox.is_empty()); - - // And respond with an Ack - let ack_message = Message::Control(ControlMessage { - id: message.id, - sending_component_id: connector_id, - content: ControlContent::Ack, - }); - self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message)); - self.runtime.send_message(message.sending_component_id, ack_message); - }, - ControlContent::CloseChannel(port_id) => { - // Mark the port as being closed - let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap(); - port.state = PortState::Closed; - - // Send an Ack - let ack_message = Message::Control(ControlMessage { - id: message.id, - sending_component_id: connector_id, - content: ControlContent::Ack, - }); - self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message)); - self.runtime.send_message(message.sending_component_id, ack_message); - }, - ControlContent::Ack => { - if let Some((target_component, new_control_message)) = scheduled.router.handle_ack(connector_id, message.id) { - self.debug_conn(connector_id, &format!("Sending message [ack ack] \n --- {:?}", new_control_message)); - self.runtime.send_message(target_component, new_control_message); - }; - }, - ControlContent::Ping => {}, - } - }, - _ => { - // All other cases have to be handled by the component + if let Message::Control(message) = &message { + match message.content { + ControlContent::PortPeerChanged(port_id, new_target_connector_id) => { + // Need to change port target + let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap(); + port.peer_connector = new_target_connector_id; + + // Note: for simplicity we program the scheduler to always finish + // running a connector with an empty outbox. If this ever changes + // then accepting the "port peer changed" message implies we need + // to change the recipient of the message in the outbox. + debug_assert!(scheduled.ctx.outbox.is_empty()); + + // And respond with an Ack + let ack_message = Message::Control(ControlMessage { + id: message.id, + sending_component_id: connector_id, + content: ControlContent::Ack, + }); + self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message)); + self.runtime.send_message(message.sending_component_id, ack_message); + }, + ControlContent::CloseChannel(port_id) => { + // Mark the port as being closed + let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap(); + port.state = PortState::Closed; + + // Send an Ack + let ack_message = Message::Control(ControlMessage { + id: message.id, + sending_component_id: connector_id, + content: ControlContent::Ack, + }); + self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message)); + self.runtime.send_message(message.sending_component_id, ack_message); + }, + ControlContent::Ack => { + if let Some((target_component, new_control_message)) = scheduled.router.handle_ack(connector_id, message.id) { + self.debug_conn(connector_id, &format!("Sending message [ack ack] \n --- {:?}", new_control_message)); + self.runtime.send_message(target_component, new_control_message); + }; + }, + ControlContent::Ping => {}, + } + } else { + // Not a control message + if scheduled.shutting_down { + // Since we're shutting down, we just want to respond with a + // message saying the message did not arrive. + debug_assert!(scheduled.ctx.inbox.get_next_message_ticket().is_none()); // public inbox should be completely cleared + self.handle_message_while_shutting_down(message, scheduled); + } else { scheduled.ctx.inbox.insert_new(message); } } } } - /// Handles inbox messages while shutting down. This intends to handle the - /// case where a component cleanly exited outside of a sync region, but a - /// peer, before receiving the `CloseChannel` message, sent a message inside - /// a sync region. This peer should be notified that its message is not - /// received by a component in a sync region. - fn handle_inbox_while_shutting_down(&mut self, scheduled: &mut ScheduledConnector) { - // Note: we're not handling the public inbox, we're dealing with the - // private one! - debug_assert!(scheduled.shutting_down); - - while let Some(ticket) = scheduled.ctx.get_next_message_ticket_even_if_not_in_sync() { - let message = scheduled.ctx.read_message_using_ticket(ticket); - let target_port_and_round_number = match message { - Message::Data(msg) => Some((msg.data_header.target_port, msg.sync_header.sync_round)), - Message::SyncComp(_) => None, - Message::SyncPort(msg) => Some((msg.target_port, msg.sync_header.sync_round)), - Message::SyncControl(_) => None, - Message::Control(_) => None, - }; + fn handle_message_while_shutting_down(&mut self, message: Message, scheduled: &mut ScheduledConnector) { + let target_port_and_round_number = match message { + Message::Data(msg) => Some((msg.data_header.target_port, msg.sync_header.sync_round)), + Message::SyncComp(_) => None, + Message::SyncPort(msg) => Some((msg.target_port, msg.sync_header.sync_round)), + Message::SyncControl(_) => None, + Message::Control(_) => None, + }; - if let Some((target_port, sync_round)) = target_port_and_round_number { - // This message is aimed at a port, but we're shutting down, so - // notify the peer that its was not received properly. - // (also: since we're shutting down, we're not in sync mode and - // the context contains the definitive set of owned ports) - let port = scheduled.ctx.get_port_by_id(target_port).unwrap(); - let message = SyncControlMessage{ - in_response_to_sync_round: sync_round, - target_component_id: port.peer_connector, - content: SyncControlContent::ChannelIsClosed(port.peer_id), - }; - self.debug_conn(scheduled.ctx.id, &format!("Sending message [shutdown]\n --- {:?}", message)); - self.runtime.send_message(port.peer_connector, Message::SyncControl(message)); - } + if let Some((target_port, sync_round)) = target_port_and_round_number { + // This message is aimed at a port, but we're shutting down, so + // notify the peer that its was not received properly. + // (also: since we're shutting down, we're not in sync mode and + // the context contains the definitive set of owned ports) + let port = scheduled.ctx.get_port_by_id(target_port).unwrap(); + let message = SyncControlMessage{ + in_response_to_sync_round: sync_round, + target_component_id: port.peer_connector, + content: SyncControlContent::ChannelIsClosed(port.peer_id), + }; + self.debug_conn(scheduled.ctx.id, &format!("Sending message [shutdown]\n --- {:?}", message)); + self.runtime.send_message(port.peer_connector, Message::SyncControl(message)); } } @@ -351,7 +348,7 @@ impl Scheduler { } else { // Just left sync region. So prepare inbox for the next sync // round - scheduled.ctx.inbox.prepare_for_next_round(); + scheduled.ctx.inbox.clear_read_messages(); } scheduled.ctx.changed_in_sync = false; // reset flag @@ -617,6 +614,8 @@ impl<'a> Iterator for MessagesIter<'a> { /// for later reading. Later reading in this case implies that they are put back /// for reading in the next sync round. struct Inbox { + temp_m: Vec, + temp_d: Vec, messages: RawVec, next_delay_idx: u32, start_read_idx: u32, @@ -633,6 +632,7 @@ pub(crate) struct MessageTicket { impl Inbox { fn new() -> Self { return Inbox { + temp_m: Vec::new(), temp_d: Vec::new(), messages: RawVec::new(), next_delay_idx: 0, start_read_idx: 0, @@ -643,10 +643,17 @@ impl Inbox { fn insert_new(&mut self, message: Message) { assert!(self.messages.len() < u32::MAX as usize); // TODO: @Size + self.temp_m.push(message); + return; self.messages.push(message); } fn get_next_message_ticket(&mut self) -> Option { + if self.next_read_idx as usize >= self.temp_m.len() { return None }; + let idx = self.next_read_idx; + self.generation += 1; + self.next_read_idx += 1; + return Some(MessageTicket{ index: idx, generation: self.generation }); let cur_read_idx = self.next_read_idx as usize; if cur_read_idx >= self.messages.len() { return None; @@ -662,11 +669,15 @@ impl Inbox { fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message { debug_assert_eq!(self.generation, ticket.generation); + return &self.temp_m[ticket.index as usize]; return unsafe{ &*self.messages.get(ticket.index as usize) } } fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message { debug_assert_eq!(self.generation, ticket.generation); + debug_assert!(ticket.index < self.next_read_idx); + self.next_read_idx -= 1; + return self.temp_m.remove(ticket.index as usize); unsafe { let take_idx = ticket.index as usize; let val = std::ptr::read(self.messages.get(take_idx)); @@ -689,6 +700,8 @@ impl Inbox { fn put_back_message(&mut self, message: Message) { // We have space in front of the array because we've taken out a message // before. + self.temp_d.push(message); + return; debug_assert!(self.next_delay_idx < self.start_read_idx); unsafe { // Write to front of the array @@ -698,6 +711,12 @@ impl Inbox { } fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter { + return MessagesIter{ + messages: self.temp_m.as_slice(), + next_index: self.start_read_idx as usize, + max_index: self.next_read_idx as usize, + match_port_id + }; return MessagesIter{ messages: self.messages.as_slice(), next_index: self.start_read_idx as usize, @@ -706,7 +725,13 @@ impl Inbox { }; } - fn prepare_for_next_round(&mut self) { + fn clear_read_messages(&mut self) { + self.temp_m.drain(0..self.next_read_idx as usize); + for (idx, v) in self.temp_d.drain(..).enumerate() { + self.temp_m.insert(idx, v); + } + self.next_read_idx = 0; + return; // Deallocate everything that was read self.destroy_range(self.start_read_idx, self.next_read_idx); self.generation += 1; @@ -728,8 +753,21 @@ impl Inbox { } fn transfer_messages_for_port(&mut self, port: PortIdLocal, new_inbox: &mut Inbox) { - // Convoluted assert to make sure we're in non-sync mode, as that is - // when this is called, and that makes our lives easier + debug_assert!(self.temp_d.is_empty()); + let mut idx = 0; + while idx < self.temp_m.len() { + let msg = &self.temp_m[idx]; + if let Some(target) = msg.target_port() { + if target == port { + new_inbox.temp_m.push(self.temp_m.remove(idx)); + continue; + } + } + + idx += 1; + } + return; + let mut idx = 0; while idx < self.messages.len() { let message = unsafe{ &*self.messages.get(idx) }; @@ -738,18 +776,19 @@ impl Inbox { // Transfer port unsafe { let message = std::ptr::read(message as *const _); - let remaining = self.messages.len() - idx; - if remaining > 1 { - self.messages.move_range(idx + 1, idx, remaining - 1); + let remaining = self.messages.len() - idx - 1; // idx < len, due to loop condition + if remaining > 0 { + self.messages.move_range(idx + 1, idx, remaining); } self.messages.len -= 1; new_inbox.insert_new(message); } - } else { - // Do not transfer port - idx += 1; + + continue; // do not increment index } } + + idx += 1; } } @@ -763,17 +802,17 @@ impl Inbox { } } } - -impl Drop for Inbox { - fn drop(&mut self) { - // Whether in sync or not in sync. We have two ranges of allocated - // messages: - // - delayed messages: from 0 to `next_delay_idx` (which is 0 if in non-sync) - // - readable messages: from `start_read_idx` to `messages.len` - self.destroy_range(0, self.next_delay_idx); - self.destroy_range(self.start_read_idx, self.messages.len as u32); - } -} +// +// impl Drop for Inbox { +// fn drop(&mut self) { +// // Whether in sync or not in sync. We have two ranges of allocated +// // messages: +// // - delayed messages: from 0 to `next_delay_idx` (which is 0 if in non-sync) +// // - readable messages: from `start_read_idx` to `messages.len` +// self.destroy_range(0, self.next_delay_idx); +// self.destroy_range(self.start_read_idx, self.messages.len as u32); +// } +// } // ----------------------------------------------------------------------------- // Control messages