diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 14fd2fb0dd6670acda349042897d5b70743c1c47..a73582435ecaeecf86030c59d95f085e71cff9b1 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -103,6 +103,17 @@ pub(crate) enum Mode { Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 } +impl Mode { + fn is_in_sync_block(&self) -> bool { + use Mode::*; + + match self { + Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true, + NonSync | StartExit | BusyExit | Exit => false, + } + } +} + struct SelectCase { involved_ports: Vec, } @@ -453,6 +464,11 @@ impl CompPDL { fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component starting sync mode"); self.consensus.notify_sync_start(comp_ctx); + for message in self.inbox_main.iter() { + if let Some(message) = message { + self.consensus.handle_new_data_message(comp_ctx, message); + } + } debug_assert_eq!(self.mode, Mode::NonSync); self.mode = Mode::Sync; } @@ -534,6 +550,11 @@ impl CompPDL { /// will handle putting it in the correct place, and potentially blocking /// the port in case too many messages are being received. fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { + // Whatever we do, glean information from headers in message + if self.mode.is_in_sync_block() { + self.consensus.handle_new_data_message(comp_ctx, &message); + } + // Check if we can insert it directly into the storage associated with // the port let target_port_id = message.data_header.target_port;