From 418aa1170154a03b70e2b534623a947f3c4cdb37 2021-10-14 22:23:24 From: MH Date: 2021-10-14 22:23:24 Subject: [PATCH] initial sync solution generation --- diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index d11473075fab677b4fee589818b22944edd1c2b5..71e86f483ce3f0640ff404e959dc4040f0049c9d 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -1,9 +1,12 @@ use std::collections::HashMap; +use std::ops::Deref; use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; +use crate::runtime2::global_store::{ConnectorKey, GlobalStore}; +use crate::runtime2::port::PortKind; use super::global_store::ConnectorId; use super::inbox::{ PrivateInbox, PublicInbox, OutgoingMessage, DataMessage, SyncMessage, @@ -341,9 +344,12 @@ impl ConnectorPDL { } - pub fn run(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling { + // TODO: Remove GlobalStore, is used to retrieve ports. Ports belong with + // the connector itself, half managed, half accessible (a-la PublicInbox + // and PrivateInbox) + pub fn run(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling { if self.in_sync { - let scheduling = self.run_in_speculative_mode(pd, results); + let scheduling = self.run_in_speculative_mode(pd, global, results); // When in speculative mode we might have generated new sync // solutions, we need to turn them into proposed solutions here. @@ -359,8 +365,10 @@ impl ConnectorPDL { // Transform branch into proposed global solution } + + return scheduling; } else { - let scheduling = self.run_in_deterministic_mode(pd, results); + let scheduling = self.run_in_deterministic_mode(pd, global, results); return scheduling; } } @@ -370,7 +378,7 @@ impl ConnectorPDL { /// where it is the caller's responsibility to immediately take care of /// those changes. The return value indicates when (and if) the connector /// needs to be scheduled again. - pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(self.in_sync); debug_assert!(!self.sync_active.is_empty()); @@ -562,7 +570,7 @@ impl ConnectorPDL { } /// Runs the connector in non-synchronous mode. - pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(!self.in_sync); debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty()); debug_assert!(self.branches.len() == 1); @@ -775,7 +783,7 @@ impl ConnectorPDL { /// Generates the initial solution for a finished sync branch. If initial /// local solution is valid, then the appropriate message is returned. /// Otherwise the initial solution is inconsistent. - fn generate_initial_solution_for_branch(&self, branch_id: BranchId) -> Option { + fn generate_initial_solution_for_branch(&self, branch_id: BranchId, key: &ConnectorKey, global: &GlobalStore) -> Option { // Retrieve branchg debug_assert!(branch_id.is_valid()); // because we're supposed to be in sync mode let branch = &self.branches[branch_id.index as usize]; @@ -811,7 +819,6 @@ impl ConnectorPDL { // Turn local port mapping into constraints on other connectors // - constraints on other components due to transferred ports - for port_delta in &branch.ports_delta { // For transferred ports we always have two constraints: one for the // sender and one for the receiver, ensuring it was not used. @@ -820,17 +827,43 @@ impl ConnectorPDL { if !sync_message.check_constraint(self.id, SyncBranchConstraint::SilentPort(port_delta.port_id)) { return None; } - - if !sync_message.add_or_check_constraint() + + // Might need to check if we own the other side of the channel + let (peer_port_id, peer_connector_id) = { + let port = global.ports.get(key, port_delta.port_id); + (port.peer_id, port.peer_connector) + }; + + if !sync_message.add_or_check_constraint(peer_connector, SyncBranchConstraint::SilentPort(peer_port_id)).unwrap() { + return None; + } } // - constraints on other components due to owned ports for port_index in 0..self.ports.num_ports() { + let port_id = self.ports.get_port_id(port_index); let port = self.ports.get_port(branch_id.index, port_index); + let (peer_port_id, peer_connector_id, is_getter) = { + let port = global.ports.get(key, port_id); + (port.peer_id, port.peer_connector, port.kind == PortKind::Getter) + }; + + let constraint = if port.is_assigned { + if is_getter { + SyncBranchConstraint::BranchNumber(port.last_registered_branch_id) + } else { + SyncBranchConstraint::PortMapping(peer_port_id, port.last_registered_branch_id) + } + } else { + SyncBranchConstraint::SilentPort(peer_port_id) + }; + if !sync_message.add_or_check_constraint(peer_connector_id, constraint).unwrap() { + return None; + } } - return None; + return Some(sync_message); } fn branch_ids_of_execution_path(&self, leaf_branch_id: BranchId, parents: &mut Vec) {