diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index fffbad2d7b3a76febb3067a58d4e0c31173c2fe1..916e7cd1c78c2580cd51044c2041e7d67c88745c 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -4,7 +4,7 @@ use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::runtime2::scheduler::Scheduler; +use crate::runtime2::scheduler::{ComponentCtxFancy, Scheduler}; use super::ConnectorId; use super::native::Connector; @@ -410,11 +410,11 @@ impl Connector for ConnectorPDL { } } - fn run(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { if self.in_sync { // Check for new messages we haven't seen before. If any of the // pending branches can accept the message, do so. - while let Some((target_port_id, message)) = self.inbox.next_message() { + while let Some((target_port_id, message)) = comp_ctx.read_next_message() { let mut branch_idx = self.sync_pending_get.first; while branch_idx != 0 { let branch = &self.branches[branch_idx as usize]; @@ -444,7 +444,7 @@ impl Connector for ConnectorPDL { } } - let scheduling = self.run_in_speculative_mode(sched_ctx, conn_ctx, delta_state); + let scheduling = self.run_in_speculative_mode(sched_ctx, comp_ctx, conn_ctx, delta_state); // When in speculative mode we might have generated new sync // solutions, we need to turn them into proposed solutions here. @@ -493,7 +493,7 @@ impl Connector for ConnectorPDL { return scheduling; } else { - let scheduling = self.run_in_deterministic_mode(sched_ctx, conn_ctx, delta_state); + let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx, conn_ctx, delta_state); return scheduling; } } @@ -527,7 +527,7 @@ impl ConnectorPDL { // ------------------------------------------------------------------------- pub fn handle_data_message(&mut self, target_port: PortIdLocal, message: DataMessage) { - self.inbox.insert_message(target_port, message); + // self.inbox.insert_message(target_port, message); } /// Accepts a synchronous message and combines it with the locally stored @@ -764,7 +764,7 @@ impl ConnectorPDL { /// where it is the caller's responsibility to immediately take care of /// those changes. The return value indicates when (and if) the connector /// needs to be scheduled again. - pub fn run_in_speculative_mode(&mut self, sched_ctx: SchedulerCtx, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_speculative_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(self.in_sync); if self.sync_active.is_empty() { @@ -864,7 +864,7 @@ impl ConnectorPDL { // But if some messages can be immediately applied, do so // now. - let messages = self.inbox.get_messages(local_port_id, port_mapping.last_registered_branch_id); + let messages = comp_ctx.get_read_messages(local_port_id, port_mapping.last_registered_branch_id); let mut did_have_messages = false; for message in messages { @@ -986,7 +986,7 @@ impl ConnectorPDL { } /// Runs the connector in non-synchronous mode. - pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, conn_ctx: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(!self.in_sync); debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty()); debug_assert!(self.branches.len() == 1);