diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 5495680cd51dc7f94a8de2fa83a3e7a6d0eba35d..58db4129573181cf62b53c3c60010562499da244 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -39,7 +39,7 @@ use super::consensus::{Consensus, Consistency, RoundConclusion, find_ports_in_va use super::inbox::{DataMessage, Message, SyncCompMessage, SyncPortMessage, SyncControlMessage, PublicInbox}; use super::native::Connector; use super::port::{PortKind, PortIdLocal}; -use super::scheduler::{ComponentCtx, SchedulerCtx}; +use super::scheduler::{ComponentCtx, SchedulerCtx, MessageTicket}; pub(crate) struct ConnectorPublic { pub inbox: PublicInbox, @@ -185,35 +185,45 @@ impl ConnectorPDL { // --- Handling messages pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) -> Option { - while let Some(message) = ctx.read_next_message() { - match message { - Message::Data(message) => self.handle_new_data_message(message, ctx), - Message::SyncComp(message) => { - if let Some(result) = self.handle_new_sync_comp_message(message, ctx) { - return Some(result); - } - }, - Message::SyncPort(message) => self.handle_new_sync_port_message(message, ctx), - Message::SyncControl(message) => { - if let Some(result) = self.handle_new_sync_control_message(message, ctx) { - return Some(result); - } - }, - Message::Control(_) => unreachable!("control message in component"), + while let Some(ticket) = ctx.get_next_message_ticket() { + let message = ctx.read_message_using_ticket(ticket); + let immediate_result = if let Message::Data(_) = message { + self.handle_new_data_message(ticket, ctx); + None + } else { + match ctx.take_message_using_ticket(ticket) { + Message::Data(_) => unreachable!(), + Message::SyncComp(message) => { + self.handle_new_sync_comp_message(message, ctx) + }, + Message::SyncPort(message) => { + self.handle_new_sync_port_message(message, ctx); + None + }, + Message::SyncControl(message) => { + self.handle_new_sync_control_message(message, ctx) + }, + Message::Control(_) => unreachable!("control message in component"), + } + }; + + if let Some(result) = immediate_result { + return Some(result); } } return None; } - pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) { + pub fn handle_new_data_message(&mut self, ticket: MessageTicket, ctx: &mut ComponentCtx) { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. - if !self.consensus.handle_new_data_message(&message, ctx) { - // Old message, so drop it + if self.consensus.handle_new_data_message(ticket, ctx) { + // Message should not be handled now return; } + let message = ctx.read_message_using_ticket(ticket).as_data(); let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage); while let Some(branch_id) = iter_id { iter_id = self.tree.get_queue_next(branch_id);