diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 7fa49811e7ed9d11bc76bfe75864b66da837015e..bd18c4424c96977ba644e6e430c04867dadc3b14 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -9,7 +9,7 @@ use crate::runtime2::consensus::RoundConclusion; use super::{ConnectorKey, ConnectorId, RuntimeInner}; use super::branch::{BranchId, FakeTree, QueueKind, SpeculativeState}; -use super::scheduler::{SchedulerCtx, ComponentCtx}; +use super::scheduler::{SchedulerCtx, ComponentCtx, MessageTicket}; use super::port::{Port, PortIdLocal, Channel, PortKind}; use super::consensus::{Consensus, Consistency, find_ports_in_value_group}; use super::connector::{ConnectorScheduling, ConnectorPDL}; @@ -111,27 +111,33 @@ impl ConnectorApplication { } fn handle_new_messages(&mut self, comp_ctx: &mut ComponentCtx) { - while let Some(message) = comp_ctx.read_next_message() { - match message { - Message::Data(message) => self.handle_new_data_message(message, comp_ctx), - Message::SyncComp(message) => self.handle_new_sync_comp_message(message, comp_ctx), - Message::SyncPort(message) => self.handle_new_sync_port_message(message, comp_ctx), - Message::SyncControl(message) => todo!("implement"), - Message::Control(_) => unreachable!("control message in native API component"), + while let Some(ticket) = comp_ctx.get_next_message_ticket() { + let message = comp_ctx.read_message_using_ticket(ticket); + if let Message::Data(_) = message { + self.handle_new_data_message(ticket, comp_ctx) + } else { + match comp_ctx.take_message_using_ticket(ticket) { + Message::Data(message) => unreachable!(), + Message::SyncComp(message) => self.handle_new_sync_comp_message(message, comp_ctx), + Message::SyncPort(message) => self.handle_new_sync_port_message(message, comp_ctx), + Message::SyncControl(message) => todo!("implement"), + Message::Control(_) => unreachable!("control message in native API component"), + } } } } - pub(crate) fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) { + pub(crate) fn handle_new_data_message(&mut self, ticket: MessageTicket, ctx: &mut ComponentCtx) { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. - if !self.consensus.handle_new_data_message(&message, ctx) { + if !self.consensus.handle_new_data_message(ticket, ctx) { // Old message, so drop it return; } let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage); while let Some(branch_id) = iter_id { + let message = ctx.read_message_using_ticket(ticket).as_data(); iter_id = self.tree.get_queue_next(branch_id); let branch = &self.tree[branch_id];