diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 5769144ca08a4f4f213f7b21801117d11f3cf8da..e951d0d9c29ff1c2f208c3dccd1df388d9d9b54a 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -4,10 +4,11 @@ use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; +use crate::runtime2::scheduler::Scheduler; use super::ConnectorId; use super::native::Connector; -use super::scheduler::ConnectorCtx; +use super::scheduler::{SchedulerCtx, ConnectorCtx}; use super::inbox::{ PrivateInbox, PublicInbox, DataMessage, SyncMessage, SolutionMessage, Message, MessageContents, @@ -332,22 +333,53 @@ pub(crate) struct ConnectorPDL { pub ports: ConnectorPorts, } -struct TempCtx {} -impl RunContext for TempCtx { +struct ConnectorRunContext<'a> { + inbox: &'a PrivateInbox, + ports: &'a ConnectorPorts, + branch: &'a Branch, + scheduler: SchedulerCtx<'a>, +} + +impl<'a> RunContext for ConnectorRunContext<'a> { fn did_put(&mut self, port: PortId) -> bool { - todo!() + if self.branch.ports_delta.iter().any(|v| v.port_id.index == port.0.u32_suffix) { + // Either acquired or released, must be silent + return false; + } + + let port_index = self.ports.get_port_index(PortIdLocal::new(port.0.u32_suffix)).unwrap(); + let mapping = self.ports.get_port(self.branch.index.index, port_index); + return mapping.is_assigned; } fn get(&mut self, port: PortId) -> Option { - todo!() + let port_id = PortIdLocal::new(port.0.u32_suffix); + match self.branch.received.get(&port_id) { + Some(message) => Some(message.message.clone()), + None => None, + } } fn fires(&mut self, port: PortId) -> Option { - todo!() + let port_id = PortIdLocal::new(port.0.u32_suffix); + if self.branch.ports_delta.iter().any(|v| v.port_id == port_id) { + return None + } + + let port_index = self.ports.get_port_index(port_id).unwrap(); + let mapping = self.ports.get_port(self.branch.index.index, port_index); + + if mapping.is_assigned { + return Some(Value::Bool(mapping.num_times_fired != 0)); + } else { + return None; + } } fn get_channel(&mut self) -> Option<(Value, Value)> { - todo!() + let (getter, putter) = self.scheduler.runtime.create_channel(); + debug_assert_eq!(getter.kind, PortKind::Getter); + } } @@ -364,9 +396,9 @@ impl Connector for ConnectorPDL { } } - fn run(&mut self, pd: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + fn run(&mut self, sched_ctx: &SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { if self.in_sync { - let scheduling = self.run_in_speculative_mode(pd, ctx, delta_state); + let scheduling = self.run_in_speculative_mode(pd, conn_ctx, delta_state); // When in speculative mode we might have generated new sync // solutions, we need to turn them into proposed solutions here. @@ -388,9 +420,9 @@ impl Connector for ConnectorPDL { // Turn local solution into a message and send it along // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed - let solution_message = self.generate_initial_solution_for_branch(branch_id, ctx); + let solution_message = self.generate_initial_solution_for_branch(branch_id, conn_ctx); if let Some(valid_solution) = solution_message { - self.submit_sync_solution(valid_solution, ctx, delta_state); + self.submit_sync_solution(valid_solution, conn_ctx, delta_state); } else { // Branch is actually invalid, but we only just figured // it out. We need to mark it as invalid to prevent @@ -415,7 +447,7 @@ impl Connector for ConnectorPDL { return scheduling; } else { - let scheduling = self.run_in_deterministic_mode(pd, ctx, delta_state); + let scheduling = self.run_in_deterministic_mode(pd, conn_ctx, delta_state); return scheduling; } } @@ -694,7 +726,7 @@ impl ConnectorPDL { let branch = Self::pop_branch_from_queue(&mut self.branches, &mut self.sync_active); // Run the branch to the next blocking point - let mut run_context = TempCtx{}; + let mut run_context = ConnectorRunContext {}; let run_result = branch.code_state.run(&mut run_context, pd); // Match statement contains `return` statements only if the particular @@ -894,7 +926,7 @@ impl ConnectorPDL { } /// Runs the connector in non-synchronous mode. - pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_deterministic_mode(&mut self, sched_ctx: &SchedulerCtx, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(!self.in_sync); debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty()); debug_assert!(self.branches.len() == 1); @@ -902,7 +934,11 @@ impl ConnectorPDL { let branch = &mut self.branches[0]; debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync); - let mut run_context = TempCtx{}; + let mut run_context = ConnectorRunContext{ + inbox: &self.inbox, + ports: &self.ports, + branch: &Branch {} + }; let run_result = branch.code_state.run(&mut run_context, pd); match run_result {