diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index a86101c3ad92818e07791e2e9ef9a23829db3e4f..d11473075fab677b4fee589818b22944edd1c2b5 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -4,8 +4,12 @@ use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::runtime2::inbox::{PrivateInbox, PublicInbox, OutgoingMessage, DataMessage, SyncMessage}; -use crate::runtime2::port::PortIdLocal; +use super::global_store::ConnectorId; +use super::inbox::{ + PrivateInbox, PublicInbox, OutgoingMessage, DataMessage, SyncMessage, + SyncBranchConstraint, SyncConnectorSolution +}; +use super::port::PortIdLocal; /// Represents the identifier of a branch (the index within its container). An /// ID of `0` generally means "no branch" (e.g. no parent, or a port did not @@ -26,7 +30,7 @@ impl BranchId { } #[inline] - fn is_valid(&self) -> bool { + pub(crate) fn is_valid(&self) -> bool { return self.index != 0; } } @@ -199,6 +203,12 @@ impl ConnectorPorts { return None } + /// Retrieves the ID associated with the port at the provided index + #[inline] + fn get_port_id(&self, port_index: usize) -> PortIdLocal { + return self.owned_ports[port_index]; + } + #[inline] fn get_port(&self, branch_idx: u32, port_idx: usize) -> &PortAssignment { let mapped_idx = self.mapped_index(branch_idx, port_idx); @@ -266,7 +276,7 @@ impl ConnectorPublic { // TODO: Do this outside of the connector, create a wrapping struct pub(crate) struct ConnectorPDL { // State and properties of connector itself - id: u32, + id: ConnectorId, in_sync: bool, // Branch management branches: Vec, // first branch is always non-speculative one @@ -301,10 +311,12 @@ impl RunContext for TempCtx { impl ConnectorPDL { /// Constructs a representation of a connector. The assumption is that the /// initial branch is at the first instruction of the connector's code, - /// hence is in a non-sync state. - pub fn new(id: u32, initial_branch: Branch, owned_ports: Vec) -> Self { + /// hence is in a non-sync state. Note that the initial ID is invalid, we + /// assume the connector will get inserted into the runtime, there it will + /// receive its ID. + pub fn new(initial_branch: Branch, owned_ports: Vec) -> Self { Self{ - id, + id: ConnectorId::new_invalid(), in_sync: false, branches: vec![initial_branch], sync_active: BranchQueue::new(), @@ -316,6 +328,11 @@ impl ConnectorPDL { } } + pub(crate) fn set_connector_id(&mut self, id: ConnectorId) { + debug_assert!(!self.id.is_valid()); // ID should only be set once + self.id = id; + } + pub fn is_in_sync_mode(&self) -> bool { return self.in_sync; } @@ -591,7 +608,7 @@ impl ConnectorPDL { }; let new_connector_ports = results.ports.clone(); // TODO: Do something with this let new_connector_branch = Branch::new_initial_branch(new_connector_state); - let new_connector = ConnectorPDL::new(0, new_connector_branch, new_connector_ports); + let new_connector = ConnectorPDL::new(new_connector_branch, new_connector_ports); results.new_connectors.push(new_connector); @@ -653,7 +670,8 @@ impl ConnectorPDL { } // Helpers for local port management. Specifically for adopting/losing - // ownership over ports + // ownership over ports, and for checking if specific ports can be sent + // over another port. /// Releasing ownership of ports while in non-sync mode. This only occurs /// while instantiating new connectors @@ -754,7 +772,79 @@ impl ConnectorPDL { // Helpers for generating and handling sync messages (and the solutions that // are described by those sync messages) - fn generate_initial_solution_for_branch(&self, branch_id: BranchId,) + /// Generates the initial solution for a finished sync branch. If initial + /// local solution is valid, then the appropriate message is returned. + /// Otherwise the initial solution is inconsistent. + fn generate_initial_solution_for_branch(&self, branch_id: BranchId) -> Option { + // Retrieve branchg + debug_assert!(branch_id.is_valid()); // because we're supposed to be in sync mode + let branch = &self.branches[branch_id.index as usize]; + debug_assert_eq!(branch.sync_state, SpeculativeState::ReachedSyncEnd); + + // Set up storage (this is also the storage for all of the connectors + // that will be visited, hence the initial size approximation) + let mut all_branch_ids = Vec::new(); + self.branch_ids_of_execution_path(branch_id, &mut all_branch_ids); + + let num_ports = self.ports.num_ports(); + let approximate_peers = num_ports; + let mut initial_solution_port_mapping = Vec::with_capacity(num_ports); + for port_idx in 0..self.ports.num_ports() { + let port_id = self.ports.get_port_id(port_idx); + let port_desc = self.ports.get_port(branch_id.index, port_idx); + + // Note: if assigned then we expect a valid branch ID. Otherwise we have the "invalid + // branch" as ID, marking that we want it to be silent + debug_assert!(port_desc.is_assigned == port_desc.last_registered_branch_id.is_valid()); + initial_solution_port_mapping.push((port_id, port_desc.last_registered_branch_id)); + } + + let initial_local_solution = SyncConnectorSolution{ + connector_id: self.id, + terminating_branch_id: branch_id, + execution_branch_ids: all_branch_ids, + final_port_mapping: initial_solution_port_mapping, + }; + + let mut sync_message = SyncMessage::new(initial_local_solution, approximate_peers); + + // Turn local port mapping into constraints on other connectors + + // - constraints on other components due to transferred ports + + for port_delta in &branch.ports_delta { + // For transferred ports we always have two constraints: one for the + // sender and one for the receiver, ensuring it was not used. + // TODO: This will fail if a port is passed around multiple times. + // maybe a special "passed along" entry in `ports_delta`. + if !sync_message.check_constraint(self.id, SyncBranchConstraint::SilentPort(port_delta.port_id)) { + return None; + } + + if !sync_message.add_or_check_constraint() + } + + // - constraints on other components due to owned ports + for port_index in 0..self.ports.num_ports() { + let port = self.ports.get_port(branch_id.index, port_index); + + } + + return None; + } + + fn branch_ids_of_execution_path(&self, leaf_branch_id: BranchId, parents: &mut Vec) { + debug_assert!(parents.is_empty()); + + let mut next_branch_id = leaf_branch_id; + debug_assert!(next_branch_id.is_valid()); + + while next_branch_id.is_valid() { + parents.push(next_branch_id); + let branch = &self.branches[next_branch_id.index as usize]; + next_branch_id = branch.parent_index; + } + } } /// A data structure passed to a connector whose code is being executed that is