diff --git a/src/collections/raw_vec.rs b/src/collections/raw_vec.rs index 088430ecf66725df09cb21e7c4edf7b675f80a37..c1b4806d59ab89348ef4c6225d1660cf1a1291aa 100644 --- a/src/collections/raw_vec.rs +++ b/src/collections/raw_vec.rs @@ -11,10 +11,12 @@ enum AllocError { /// ensuring that no illegal mutable access occurs. /// A lot of the logic is simply stolen from the std lib. The destructor will /// free the backing memory, but will not run any destructors. +/// Try to use functions to modify the length. But feel free if you know what +/// you're doing pub struct RawVec { base: *mut T, cap: usize, - len: usize, + pub len: usize, } impl RawVec { @@ -38,16 +40,19 @@ impl RawVec { return result; } + #[inline] pub unsafe fn get(&self, idx: usize) -> *const T { debug_assert!(idx < self.len); return self.base.add(idx); } + #[inline] pub unsafe fn get_mut(&self, idx: usize) -> *mut T { debug_assert!(idx < self.len); return self.base.add(idx); } + /// Pushes a new element to the end of the list. pub fn push(&mut self, item: T) { self.ensure_space(1).unwrap(); unsafe { @@ -57,10 +62,30 @@ impl RawVec { } } + /// Moves the elements in the range [from_idx, from_idx + num_to_move) to + /// the range [to_idx, to_idx + num_to_move). Caller must make sure that all + /// non-overlapping elements of the second range had their destructor called + /// in case those elements were used. + pub fn move_range(&mut self, from_idx: usize, to_idx: usize, num_to_move: usize) { + debug_assert!(from_idx + num_to_move <= self.len); + debug_assert!(to_idx + num_to_move <= self.len); // maybe not in future, for now this is fine + unsafe { + let source = self.base.add(from_idx); + let target = self.base.add(to_idx); + std::ptr::copy(source, target, num_to_move); + } + } + pub fn len(&self) -> usize { return self.len; } + pub fn as_slice(&self) -> &[T] { + return unsafe{ + std::slice::from_raw_parts(self.base, self.len) + }; + } + fn ensure_space(&mut self, additional: usize) -> Result<(), AllocError>{ debug_assert!(Self::T_SIZE != 0); debug_assert!(self.cap >= self.len); diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 5495680cd51dc7f94a8de2fa83a3e7a6d0eba35d..58db4129573181cf62b53c3c60010562499da244 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -39,7 +39,7 @@ use super::consensus::{Consensus, Consistency, RoundConclusion, find_ports_in_va use super::inbox::{DataMessage, Message, SyncCompMessage, SyncPortMessage, SyncControlMessage, PublicInbox}; use super::native::Connector; use super::port::{PortKind, PortIdLocal}; -use super::scheduler::{ComponentCtx, SchedulerCtx}; +use super::scheduler::{ComponentCtx, SchedulerCtx, MessageTicket}; pub(crate) struct ConnectorPublic { pub inbox: PublicInbox, @@ -185,35 +185,45 @@ impl ConnectorPDL { // --- Handling messages pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) -> Option { - while let Some(message) = ctx.read_next_message() { - match message { - Message::Data(message) => self.handle_new_data_message(message, ctx), - Message::SyncComp(message) => { - if let Some(result) = self.handle_new_sync_comp_message(message, ctx) { - return Some(result); - } - }, - Message::SyncPort(message) => self.handle_new_sync_port_message(message, ctx), - Message::SyncControl(message) => { - if let Some(result) = self.handle_new_sync_control_message(message, ctx) { - return Some(result); - } - }, - Message::Control(_) => unreachable!("control message in component"), + while let Some(ticket) = ctx.get_next_message_ticket() { + let message = ctx.read_message_using_ticket(ticket); + let immediate_result = if let Message::Data(_) = message { + self.handle_new_data_message(ticket, ctx); + None + } else { + match ctx.take_message_using_ticket(ticket) { + Message::Data(_) => unreachable!(), + Message::SyncComp(message) => { + self.handle_new_sync_comp_message(message, ctx) + }, + Message::SyncPort(message) => { + self.handle_new_sync_port_message(message, ctx); + None + }, + Message::SyncControl(message) => { + self.handle_new_sync_control_message(message, ctx) + }, + Message::Control(_) => unreachable!("control message in component"), + } + }; + + if let Some(result) = immediate_result { + return Some(result); } } return None; } - pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) { + pub fn handle_new_data_message(&mut self, ticket: MessageTicket, ctx: &mut ComponentCtx) { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. - if !self.consensus.handle_new_data_message(&message, ctx) { - // Old message, so drop it + if self.consensus.handle_new_data_message(ticket, ctx) { + // Message should not be handled now return; } + let message = ctx.read_message_using_ticket(ticket).as_data(); let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage); while let Some(branch_id) = iter_id { iter_id = self.tree.get_queue_next(branch_id); diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 37401255e873261b6b31e02fe0431fbca980a740..d15b6edfdaf3699bb4e01ca26bd67e073551fd28 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -12,7 +12,7 @@ use super::inbox::{ SyncPortMessage, SyncPortContent, SyncControlMessage, SyncControlContent }; -use super::scheduler::{ComponentCtx, ComponentPortChange}; +use super::scheduler::{ComponentCtx, ComponentPortChange, MessageTicket}; struct BranchAnnotation { channel_mapping: Vec, @@ -88,6 +88,13 @@ pub(crate) enum Consistency { Inconsistent, } +#[derive(PartialEq, Eq)] +pub(crate) enum MessageOrigin { + Past, + Present, + Future +} + impl Consensus { pub fn new() -> Self { return Self { @@ -375,20 +382,34 @@ impl Consensus { /// Handles a new data message by handling the sync header. The caller is /// responsible for checking for branches that might be able to receive /// the message. - pub fn handle_new_data_message(&mut self, message: &DataMessage, ctx: &mut ComponentCtx) -> bool { - let handled = self.handle_received_sync_header(&message.sync_header, ctx); - if handled { - self.encountered_ports.push(message.data_header.target_port); + pub fn handle_new_data_message(&mut self, ticket: MessageTicket, ctx: &mut ComponentCtx) -> bool { + let message = ctx.read_message_using_ticket(ticket).as_data(); + let target_port = message.data_header.target_port; + match self.handle_received_sync_header(message.sync_header, ctx) { + MessageOrigin::Past => return false, + MessageOrigin::Present => { + self.encountered_ports.push(target_port); + return true; + }, + MessageOrigin::Future => { + let message = ctx.take_message_using_ticket(ticket); + ctx.put_back_message(message); + return false; + } } - return handled; } /// Handles a new sync message by handling the sync header and the contents /// of the message. Returns `Some` with the branch ID of the global solution /// if the sync solution has been found. pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option { - if !self.handle_received_sync_header(&message.sync_header, ctx) { - return None; + match self.handle_received_sync_header(message.sync_header, ctx) { + MessageOrigin::Past => return None, + MessageOrigin::Present => {}, + MessageOrigin::Future => { + ctx.put_back_message(Message::SyncComp(message)); + return None + } } // And handle the contents @@ -427,8 +448,13 @@ impl Consensus { } pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) -> Option { - if !self.handle_received_sync_header(&message.sync_header, ctx) { - return None; + match self.handle_received_sync_header(message.sync_header, ctx) { + MessageOrigin::Past => return None, + MessageOrigin::Present => {}, + MessageOrigin::Future => { + ctx.put_back_message(Message::SyncPort(message)); + return None; + } } debug_assert!(self.is_in_sync()); @@ -482,11 +508,11 @@ impl Consensus { }); } - let maybe_conlusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{ + let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{ component_id: ctx.id, channels, }), ctx); - return maybe_conlusion; + return maybe_conclusion; } } } @@ -497,6 +523,10 @@ impl Consensus { return None } + // Because the message is always sent in response to a message + // originating here, the sync round number can never be larger than the + // currently stored one. + debug_assert_eq!(message.in_response_to_sync_round, self.sync_round); match message.content { SyncControlContent::ChannelIsClosed(_) => { return self.initiate_sync_failure(ctx); @@ -560,11 +590,12 @@ impl Consensus { // --- Internal helpers - fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtx) -> bool { + fn handle_received_sync_header(&mut self, sync_header: SyncHeader, ctx: &mut ComponentCtx) -> MessageOrigin { debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves - if !self.handle_peer(sync_header) { - // We can drop this package - return false; + let origin = self.handle_peer(&sync_header); + if origin != MessageOrigin::Present { + // We do not have to handle it now + return origin; } if sync_header.highest_component_id > self.highest_connector_id { @@ -598,23 +629,35 @@ impl Consensus { ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } // else: exactly equal, so do nothing - return true; + return MessageOrigin::Present; } /// Handles a (potentially new) peer. Returns `false` if the provided sync /// number is different then the expected one. - fn handle_peer(&mut self, sync_header: &SyncHeader) -> bool { + fn handle_peer(&mut self, sync_header: &SyncHeader) -> MessageOrigin { let position = self.peers.iter().position(|v| v.id == sync_header.sending_component_id); match position { Some(index) => { let entry = &mut self.peers[index]; - entry.encountered_this_round = true; - // TODO: Proper handling of potential overflow - if sync_header.sync_round >= entry.expected_sync_round { - entry.expected_sync_round = sync_header.sync_round; - return true; + if entry.encountered_this_round { + // Already encountered this round + if sync_header.sync_round < entry.expected_sync_round { + return MessageOrigin::Past; + } else if sync_header.sync_round == entry.expected_sync_round { + return MessageOrigin::Present; + } else { + return MessageOrigin::Future; + } } else { - return false; + // TODO: Proper handling of potential overflow + entry.encountered_this_round = true; + + if sync_header.sync_round >= entry.expected_sync_round { + entry.expected_sync_round = sync_header.sync_round; + return MessageOrigin::Present; + } else { + return MessageOrigin::Past; + } } }, None => { @@ -623,7 +666,7 @@ impl Consensus { encountered_this_round: true, expected_sync_round: sync_header.sync_round, }); - return true; + return MessageOrigin::Present; } } } diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index 1260a183ef590ff04c7fd96b068a795795d9565d..9175a2239cf60bb154e70b9e28c11d94d1d02fe4 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -42,7 +42,7 @@ impl BranchMarker { } /// The header added by the synchronization algorithm to all. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub(crate) struct SyncHeader { pub sending_component_id: ConnectorId, pub highest_component_id: ConnectorId, @@ -164,6 +164,32 @@ impl Message { Message::Control(_) => return None, } } + + /// If the message is sent through a particular channel, then this function + /// returns the target port through which the message was sent. + pub(crate) fn target_port(&self) -> Option { + match self { + Message::Data(message) => return Some(message.data_header.target_port), + Message::SyncPort(message) => return Some(message.target_port), + Message::SyncComp(_) => return None, + Message::SyncControl(_) => return None, + Message::Control(message) => { + match &message.content { + ControlContent::PortPeerChanged(port_id, _) => return Some(*port_id), + ControlContent::CloseChannel(port_id) => return Some(*port_id), + ControlContent::Ping => return None, + ControlContent::Ack => return None, + } + } + } + } + + pub(crate) fn as_data(&self) -> &DataMessage { + match self { + Message::Data(v) => v, + _ => unreachable!(), + } + } } /// The public inbox of a connector. The thread running the connector that owns diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 7fa49811e7ed9d11bc76bfe75864b66da837015e..bd18c4424c96977ba644e6e430c04867dadc3b14 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -9,7 +9,7 @@ use crate::runtime2::consensus::RoundConclusion; use super::{ConnectorKey, ConnectorId, RuntimeInner}; use super::branch::{BranchId, FakeTree, QueueKind, SpeculativeState}; -use super::scheduler::{SchedulerCtx, ComponentCtx}; +use super::scheduler::{SchedulerCtx, ComponentCtx, MessageTicket}; use super::port::{Port, PortIdLocal, Channel, PortKind}; use super::consensus::{Consensus, Consistency, find_ports_in_value_group}; use super::connector::{ConnectorScheduling, ConnectorPDL}; @@ -111,27 +111,33 @@ impl ConnectorApplication { } fn handle_new_messages(&mut self, comp_ctx: &mut ComponentCtx) { - while let Some(message) = comp_ctx.read_next_message() { - match message { - Message::Data(message) => self.handle_new_data_message(message, comp_ctx), - Message::SyncComp(message) => self.handle_new_sync_comp_message(message, comp_ctx), - Message::SyncPort(message) => self.handle_new_sync_port_message(message, comp_ctx), - Message::SyncControl(message) => todo!("implement"), - Message::Control(_) => unreachable!("control message in native API component"), + while let Some(ticket) = comp_ctx.get_next_message_ticket() { + let message = comp_ctx.read_message_using_ticket(ticket); + if let Message::Data(_) = message { + self.handle_new_data_message(ticket, comp_ctx) + } else { + match comp_ctx.take_message_using_ticket(ticket) { + Message::Data(message) => unreachable!(), + Message::SyncComp(message) => self.handle_new_sync_comp_message(message, comp_ctx), + Message::SyncPort(message) => self.handle_new_sync_port_message(message, comp_ctx), + Message::SyncControl(message) => todo!("implement"), + Message::Control(_) => unreachable!("control message in native API component"), + } } } } - pub(crate) fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) { + pub(crate) fn handle_new_data_message(&mut self, ticket: MessageTicket, ctx: &mut ComponentCtx) { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. - if !self.consensus.handle_new_data_message(&message, ctx) { + if !self.consensus.handle_new_data_message(ticket, ctx) { // Old message, so drop it return; } let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage); while let Some(branch_id) = iter_id { + let message = ctx.read_message_using_ticket(ticket).as_data(); iter_id = self.tree.get_queue_next(branch_id); let branch = &self.tree[branch_id]; diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 43574bcd840dadf478a2bd9ecd610dc7987b1709..964cb904b6e1f08be7932025bff98c05e14de662 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,6 +1,8 @@ use std::collections::VecDeque; +use std::mem::MaybeUninit; use std::sync::Arc; use std::sync::atomic::Ordering; +use crate::collections::RawVec; use crate::protocol::eval::EvalError; use crate::runtime2::port::ChannelId; @@ -137,7 +139,7 @@ impl Scheduler { // Check if the message has to be rerouted because we have moved the // target port to another component. self.debug_conn(connector_id, &format!("Handling message\n --- {:#?}", message)); - if let Some(target_port) = Self::get_message_target_port(&message) { + if let Some(target_port) = message.target_port() { if let Some(other_component_id) = scheduled.router.should_reroute(target_port) { self.debug_conn(connector_id, " ... Rerouting the message"); self.runtime.send_message(other_component_id, message); @@ -186,14 +188,17 @@ impl Scheduler { self.runtime.send_message(message.sending_component_id, ack_message); }, ControlContent::Ack => { - scheduled.router.handle_ack(message.id); + if let Some((target_component, new_control_message)) = scheduled.router.handle_ack(connector_id, message.id) { + self.debug_conn(connector_id, &format!("Sending message [ack ack] \n --- {:?}", new_control_message)); + self.runtime.send_message(target_component, new_control_message); + }; }, ControlContent::Ping => {}, } }, _ => { // All other cases have to be handled by the component - scheduled.ctx.inbox_messages.push(message); + scheduled.ctx.inbox.insert_new(message); } } } @@ -208,8 +213,10 @@ impl Scheduler { // Note: we're not handling the public inbox, we're dealing with the // private one! debug_assert!(scheduled.shutting_down); - while let Some(message) = scheduled.ctx.read_next_message_even_if_not_in_sync() { - let target_port_and_round_number = match &message { + + while let Some(ticket) = scheduled.ctx.get_next_message_ticket_even_if_not_in_sync() { + let message = scheduled.ctx.read_message_using_ticket(ticket); + let target_port_and_round_number = match message { Message::Data(msg) => Some((msg.data_header.target_port, msg.sync_header.sync_round)), Message::SyncComp(_) => None, Message::SyncPort(msg) => Some((msg.target_port, msg.sync_header.sync_round)), @@ -293,21 +300,7 @@ impl Scheduler { for port_id in initial_ports { // Transfer messages associated with the transferred port - let mut message_idx = 0; - while message_idx < scheduled.ctx.inbox_messages.len() { - let message = &scheduled.ctx.inbox_messages[message_idx]; - if Self::get_message_target_port(message) == Some(port_id) { - // Need to transfer this message - // TODO: Revise messages, this is becoming messy and error-prone - let message = scheduled.ctx.inbox_messages.remove(message_idx); - if message_idx < scheduled.ctx.inbox_len_read { - scheduled.ctx.inbox_len_read -= 1; - } - new_connector.ctx.inbox_messages.push(message); - } else { - message_idx += 1; - } - } + scheduled.ctx.inbox.transfer_messages_for_port(port_id, &mut new_connector.ctx.inbox); // Transfer the port itself let port_index = scheduled.ctx.ports.iter() @@ -322,7 +315,8 @@ impl Scheduler { if port.state == PortState::Open { let reroute_message = scheduled.router.prepare_reroute( port.self_id, port.peer_id, scheduled.ctx.id, - port.peer_connector, new_connector.ctx.id + port.peer_connector, new_connector.ctx.id, + &mut new_connector.router ); self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message)); @@ -355,10 +349,9 @@ impl Scheduler { if scheduled.ctx.is_in_sync { // Just entered sync region } else { - // Just left sync region. So clear inbox up until the last - // message that was read. - scheduled.ctx.inbox_messages.drain(0..scheduled.ctx.inbox_len_read); - scheduled.ctx.inbox_len_read = 0; + // Just left sync region. So prepare inbox for the next sync + // round + scheduled.ctx.inbox.prepare_for_next_round(); } scheduled.ctx.changed_in_sync = false; // reset flag @@ -388,25 +381,6 @@ impl Scheduler { } } - #[inline] - fn get_message_target_port(message: &Message) -> Option { - match message { - Message::Data(data) => return Some(data.data_header.target_port), - Message::SyncComp(_) => {}, - Message::SyncPort(content) => return Some(content.target_port), - Message::SyncControl(_) => return None, - Message::Control(control) => { - match &control.content { - ControlContent::PortPeerChanged(port_id, _) => return Some(*port_id), - ControlContent::CloseChannel(port_id) => return Some(*port_id), - ControlContent::Ping | ControlContent::Ack => {}, - } - }, - } - - return None - } - // TODO: Remove, this is debugging stuff fn debug(&self, message: &str) { println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); @@ -442,8 +416,7 @@ pub(crate) struct ComponentCtx { // Mostly managed by the scheduler pub(crate) id: ConnectorId, ports: Vec, - inbox_messages: Vec, - inbox_len_read: usize, + inbox: Inbox, // Submitted by the component is_in_sync: bool, changed_in_sync: bool, @@ -462,8 +435,7 @@ impl ComponentCtx { return Self{ id: ConnectorId::new_invalid(), ports: Vec::new(), - inbox_messages: Vec::new(), - inbox_len_read: 0, + inbox: Inbox::new(), is_in_sync: false, changed_in_sync: false, outbox: VecDeque::new(), @@ -570,43 +542,33 @@ impl ComponentCtx { /// those messages that have been previously received with /// `read_next_message`. pub(crate) fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter { - return MessagesIter { - messages: &self.inbox_messages, - next_index: 0, - max_index: self.inbox_len_read, - match_port_id - }; + return self.inbox.get_read_data_messages(match_port_id); } - /// Retrieves the next unread message from the inbox `None` if there are no - /// (new) messages to read. - // TODO: Fix the clone of the data message, entirely unnecessary - pub(crate) fn read_next_message(&mut self) -> Option { + pub(crate) fn get_next_message_ticket(&mut self) -> Option { if !self.is_in_sync { return None; } - return self.read_next_message_even_if_not_in_sync(); - } - - pub(crate) fn read_next_message_even_if_not_in_sync(&mut self) -> Option { - if self.inbox_len_read == self.inbox_messages.len() { return None; } - - // We want to keep data messages in the inbox, because we need to check - // them in the future. We don't want to keep sync messages around, we - // should only handle them once. Control messages should never be in - // here. - let message = &self.inbox_messages[self.inbox_len_read]; - match &message { - Message::Data(content) => { - // Keep message in inbox for later reading - self.inbox_len_read += 1; - return Some(Message::Data(content.clone())); - }, - Message::SyncComp(_) | Message::SyncPort(_) | Message::SyncControl(_) => { - // Remove message from inbox - let message = self.inbox_messages.remove(self.inbox_len_read); - return Some(message); - }, - Message::Control(_) => unreachable!("control message ended up in component inbox"), - } + return self.inbox.get_next_message_ticket(); + } + + #[inline] + pub(crate) fn get_next_message_ticket_even_if_not_in_sync(&mut self) -> Option { + return self.inbox.get_next_message_ticket(); + } + + #[inline] + pub(crate) fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message { + return self.inbox.read_message_using_ticket(ticket); + } + + #[inline] + pub(crate) fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message { + return self.inbox.take_message_using_ticket(ticket) + } + + /// Puts back a message back into the inbox. The reason being that the + /// message is actually part of the next sync round. This will + pub(crate) fn put_back_message(&mut self, message: Message) { + self.inbox.put_back_message(message); } } @@ -646,6 +608,173 @@ impl<'a> Iterator for MessagesIter<'a> { } } +// ----------------------------------------------------------------------------- +// Private Inbox +// ----------------------------------------------------------------------------- + +/// A structure that contains inbox messages. Some messages are left inside and +/// continuously re-read. Others are taken out, but may potentially be put back +/// for later reading. Later reading in this case implies that they are put back +/// for reading in the next sync round. +struct Inbox { + messages: RawVec, + next_delay_idx: u32, + start_read_idx: u32, + next_read_idx: u32, + generation: u32, +} + +#[derive(Clone, Copy)] +pub(crate) struct MessageTicket { + index: u32, + generation: u32, +} + +impl Inbox { + fn new() -> Self { + return Inbox { + messages: RawVec::new(), + next_delay_idx: 0, + start_read_idx: 0, + next_read_idx: 0, + generation: 0, + } + } + + fn insert_new(&mut self, message: Message) { + assert!(self.messages.len() < u32::MAX as usize); // TODO: @Size + self.messages.push(message); + } + + fn get_next_message_ticket(&mut self) -> Option { + let cur_read_idx = self.next_read_idx as usize; + if cur_read_idx >= self.messages.len() { + return None; + } + + self.generation += 1; + self.next_read_idx += 1; + return Some(MessageTicket{ + index: cur_read_idx as u32, + generation: self.generation + }); + } + + fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message { + debug_assert_eq!(self.generation, ticket.generation); + return unsafe{ &*self.messages.get(ticket.index as usize) } + } + + fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message { + debug_assert_eq!(self.generation, ticket.generation); + unsafe { + let take_idx = ticket.index as usize; + let val = std::ptr::read(self.messages.get(take_idx)); + + // Move messages to the right, clearing up space in the + // front. + let num_move_right = take_idx - self.start_read_idx as usize; + self.messages.move_range( + self.start_read_idx as usize, + self.start_read_idx as usize + 1, + num_move_right + ); + + self.start_read_idx += 1; + + return val; + } + } + + fn put_back_message(&mut self, message: Message) { + // We have space in front of the array because we've taken out a message + // before. + debug_assert!(self.next_delay_idx < self.start_read_idx); + unsafe { + // Write to front of the array + std::ptr::write(self.messages.get_mut(self.next_delay_idx as usize), message); + self.next_delay_idx += 1; + } + } + + fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter { + return MessagesIter{ + messages: self.messages.as_slice(), + next_index: self.start_read_idx as usize, + max_index: self.next_read_idx as usize, + match_port_id + }; + } + + fn prepare_for_next_round(&mut self) { + // Deallocate everything that was read + self.destroy_range(self.start_read_idx, self.next_read_idx); + self.generation += 1; + + // Join up all remaining values with the delayed ones in the front + let num_to_move = self.messages.len() - self.next_read_idx as usize; + self.messages.move_range( + self.next_read_idx as usize, + self.next_delay_idx as usize, + num_to_move + ); + + // Set all indices (and the RawVec len) to make sense in this new state + let new_len = self.next_delay_idx as usize + num_to_move; + self.next_delay_idx = 0; + self.start_read_idx = 0; + self.next_read_idx = 0; + self.messages.len = new_len; + } + + fn transfer_messages_for_port(&mut self, port: PortIdLocal, new_inbox: &mut Inbox) { + // Convoluted assert to make sure we're in non-sync mode, as that is + // when this is called, and that makes our lives easier + let mut idx = 0; + while idx < self.messages.len() { + let message = unsafe{ &*self.messages.get(idx) }; + if let Some(target_port) = message.target_port() { + if target_port == port { + // Transfer port + unsafe { + let message = std::ptr::read(message as *const _); + let remaining = self.messages.len() - idx; + if remaining > 1 { + self.messages.move_range(idx + 1, idx, remaining - 1); + } + self.messages.len -= 1; + new_inbox.insert_new(message); + } + } else { + // Do not transfer port + idx += 1; + } + } + } + } + + #[inline] + fn destroy_range(&mut self, start_idx: u32, end_idx: u32) { + for idx in (start_idx as usize)..(end_idx as usize) { + unsafe { + let msg = self.messages.get_mut(idx); + std::ptr::drop_in_place(msg); + } + } + } +} + +impl Drop for Inbox { + fn drop(&mut self) { + // Whether in sync or not in sync. We have two ranges of allocated + // messages: + // - delayed messages: from 0 to `next_delay_idx` (which is 0 if in non-sync) + // - readable messages: from `start_read_idx` to `messages.len` + self.destroy_range(0, self.next_delay_idx); + self.destroy_range(self.start_read_idx, self.messages.len as u32); + } +} + // ----------------------------------------------------------------------------- // Control messages // ----------------------------------------------------------------------------- @@ -658,12 +787,14 @@ struct ControlEntry { enum ControlVariant { ChangedPort(ControlChangedPort), ClosedChannel(ControlClosedChannel), + ReroutePending, } struct ControlChangedPort { target_port: PortIdLocal, // if send to this port, then reroute source_connector: ConnectorId, // connector we expect messages from target_connector: ConnectorId, // connector we need to reroute to + id_of_ack_after_confirmation: u32, // control message ID we need to send to the target upon receiving an ack } struct ControlClosedChannel { @@ -714,19 +845,26 @@ impl ControlMessageHandler { &mut self, port_id: PortIdLocal, peer_port_id: PortIdLocal, self_connector_id: ConnectorId, peer_connector_id: ConnectorId, - new_owner_connector_id: ConnectorId + new_owner_connector_id: ConnectorId, new_owner_ctrl_handler: &mut ControlMessageHandler, ) -> ControlMessage { let id = self.take_id(); + let new_owner_id = new_owner_ctrl_handler.take_id(); self.active.push(ControlEntry{ id, variant: ControlVariant::ChangedPort(ControlChangedPort{ target_port: port_id, source_connector: peer_connector_id, target_connector: new_owner_connector_id, + id_of_ack_after_confirmation: new_owner_id, }), }); + new_owner_ctrl_handler.active.push(ControlEntry{ + id: new_owner_id, + variant: ControlVariant::ReroutePending, + }); + return ControlMessage { id, sending_component_id: self_connector_id, @@ -749,14 +887,33 @@ impl ControlMessageHandler { return None; } - /// Handles an Ack as an answer to a previously sent control message - pub fn handle_ack(&mut self, id: u32) { + /// Handles an Ack as an answer to a previously sent control message. + /// Handling an Ack might spawn a new message that needs to be sent. + pub fn handle_ack(&mut self, handler_component_id: ConnectorId, id: u32) -> Option<(ConnectorId, Message)> { let index = self.active.iter() .position(|v| v.id == id); match index { - Some(index) => { self.active.remove(index); }, - None => { todo!("handling of nefarious ACKs"); }, + Some(index) => { + let removed = self.active.remove(index); + match removed.variant { + ControlVariant::ChangedPort(message) => { + return Some(( + message.target_connector, + Message::Control(ControlMessage{ + id: message.id_of_ack_after_confirmation, + sending_component_id: handler_component_id, + content: ControlContent::Ack + }) + )); + }, + _ => return None, + } + }, + None => { + todo!("handling of nefarious ACKs"); + return None; + }, } } diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 2a45f9d8c88f64a672ec64d4dbe31d05f23e9292..0a50f005008a18743870db04ccd0783276122bf4 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -15,9 +15,9 @@ use crate::runtime2::native::{ApplicationSyncAction}; // pub(crate) const NUM_INSTANCES: u32 = 7; // number of test instances constructed // pub(crate) const NUM_LOOPS: u32 = 8; // number of loops within a single test (not used by all tests) -pub(crate) const NUM_THREADS: u32 = 6; -pub(crate) const NUM_INSTANCES: u32 = 2; -pub(crate) const NUM_LOOPS: u32 = 5; +pub(crate) const NUM_THREADS: u32 = 4; +pub(crate) const NUM_INSTANCES: u32 = 1; +pub(crate) const NUM_LOOPS: u32 = 3; fn create_runtime(pdl: &str) -> Runtime { diff --git a/src/runtime2/tests/network_shapes.rs b/src/runtime2/tests/network_shapes.rs index 3797bf0ffea9c0f72e1ae97b735dc4adc97e862d..b0210e6e0cb8891c435bb048f472d54f11a8d6af 100644 --- a/src/runtime2/tests/network_shapes.rs +++ b/src/runtime2/tests/network_shapes.rs @@ -138,7 +138,7 @@ fn test_conga_line_request() { let thing = TestTimer::new("conga_line_request"); run_test_in_runtime(CODE, |api| { api.create_connector("", "constructor", ValueGroup::new_stack(vec![ - Value::UInt32(5), + Value::UInt32(1), Value::UInt32(NUM_LOOPS) ])).expect("create connector"); });