From c97c5d60bc613ee229252211cebc9762b4658dbb 2021-10-14 18:44:14 From: MH Date: 2021-10-14 18:44:14 Subject: [PATCH] commit before restructuring port storage --- diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index a86101c3ad92818e07791e2e9ef9a23829db3e4f..d11473075fab677b4fee589818b22944edd1c2b5 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -4,8 +4,12 @@ use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::runtime2::inbox::{PrivateInbox, PublicInbox, OutgoingMessage, DataMessage, SyncMessage}; -use crate::runtime2::port::PortIdLocal; +use super::global_store::ConnectorId; +use super::inbox::{ + PrivateInbox, PublicInbox, OutgoingMessage, DataMessage, SyncMessage, + SyncBranchConstraint, SyncConnectorSolution +}; +use super::port::PortIdLocal; /// Represents the identifier of a branch (the index within its container). An /// ID of `0` generally means "no branch" (e.g. no parent, or a port did not @@ -26,7 +30,7 @@ impl BranchId { } #[inline] - fn is_valid(&self) -> bool { + pub(crate) fn is_valid(&self) -> bool { return self.index != 0; } } @@ -199,6 +203,12 @@ impl ConnectorPorts { return None } + /// Retrieves the ID associated with the port at the provided index + #[inline] + fn get_port_id(&self, port_index: usize) -> PortIdLocal { + return self.owned_ports[port_index]; + } + #[inline] fn get_port(&self, branch_idx: u32, port_idx: usize) -> &PortAssignment { let mapped_idx = self.mapped_index(branch_idx, port_idx); @@ -266,7 +276,7 @@ impl ConnectorPublic { // TODO: Do this outside of the connector, create a wrapping struct pub(crate) struct ConnectorPDL { // State and properties of connector itself - id: u32, + id: ConnectorId, in_sync: bool, // Branch management branches: Vec, // first branch is always non-speculative one @@ -301,10 +311,12 @@ impl RunContext for TempCtx { impl ConnectorPDL { /// Constructs a representation of a connector. The assumption is that the /// initial branch is at the first instruction of the connector's code, - /// hence is in a non-sync state. - pub fn new(id: u32, initial_branch: Branch, owned_ports: Vec) -> Self { + /// hence is in a non-sync state. Note that the initial ID is invalid, we + /// assume the connector will get inserted into the runtime, there it will + /// receive its ID. + pub fn new(initial_branch: Branch, owned_ports: Vec) -> Self { Self{ - id, + id: ConnectorId::new_invalid(), in_sync: false, branches: vec![initial_branch], sync_active: BranchQueue::new(), @@ -316,6 +328,11 @@ impl ConnectorPDL { } } + pub(crate) fn set_connector_id(&mut self, id: ConnectorId) { + debug_assert!(!self.id.is_valid()); // ID should only be set once + self.id = id; + } + pub fn is_in_sync_mode(&self) -> bool { return self.in_sync; } @@ -591,7 +608,7 @@ impl ConnectorPDL { }; let new_connector_ports = results.ports.clone(); // TODO: Do something with this let new_connector_branch = Branch::new_initial_branch(new_connector_state); - let new_connector = ConnectorPDL::new(0, new_connector_branch, new_connector_ports); + let new_connector = ConnectorPDL::new(new_connector_branch, new_connector_ports); results.new_connectors.push(new_connector); @@ -653,7 +670,8 @@ impl ConnectorPDL { } // Helpers for local port management. Specifically for adopting/losing - // ownership over ports + // ownership over ports, and for checking if specific ports can be sent + // over another port. /// Releasing ownership of ports while in non-sync mode. This only occurs /// while instantiating new connectors @@ -754,7 +772,79 @@ impl ConnectorPDL { // Helpers for generating and handling sync messages (and the solutions that // are described by those sync messages) - fn generate_initial_solution_for_branch(&self, branch_id: BranchId,) + /// Generates the initial solution for a finished sync branch. If initial + /// local solution is valid, then the appropriate message is returned. + /// Otherwise the initial solution is inconsistent. + fn generate_initial_solution_for_branch(&self, branch_id: BranchId) -> Option { + // Retrieve branchg + debug_assert!(branch_id.is_valid()); // because we're supposed to be in sync mode + let branch = &self.branches[branch_id.index as usize]; + debug_assert_eq!(branch.sync_state, SpeculativeState::ReachedSyncEnd); + + // Set up storage (this is also the storage for all of the connectors + // that will be visited, hence the initial size approximation) + let mut all_branch_ids = Vec::new(); + self.branch_ids_of_execution_path(branch_id, &mut all_branch_ids); + + let num_ports = self.ports.num_ports(); + let approximate_peers = num_ports; + let mut initial_solution_port_mapping = Vec::with_capacity(num_ports); + for port_idx in 0..self.ports.num_ports() { + let port_id = self.ports.get_port_id(port_idx); + let port_desc = self.ports.get_port(branch_id.index, port_idx); + + // Note: if assigned then we expect a valid branch ID. Otherwise we have the "invalid + // branch" as ID, marking that we want it to be silent + debug_assert!(port_desc.is_assigned == port_desc.last_registered_branch_id.is_valid()); + initial_solution_port_mapping.push((port_id, port_desc.last_registered_branch_id)); + } + + let initial_local_solution = SyncConnectorSolution{ + connector_id: self.id, + terminating_branch_id: branch_id, + execution_branch_ids: all_branch_ids, + final_port_mapping: initial_solution_port_mapping, + }; + + let mut sync_message = SyncMessage::new(initial_local_solution, approximate_peers); + + // Turn local port mapping into constraints on other connectors + + // - constraints on other components due to transferred ports + + for port_delta in &branch.ports_delta { + // For transferred ports we always have two constraints: one for the + // sender and one for the receiver, ensuring it was not used. + // TODO: This will fail if a port is passed around multiple times. + // maybe a special "passed along" entry in `ports_delta`. + if !sync_message.check_constraint(self.id, SyncBranchConstraint::SilentPort(port_delta.port_id)) { + return None; + } + + if !sync_message.add_or_check_constraint() + } + + // - constraints on other components due to owned ports + for port_index in 0..self.ports.num_ports() { + let port = self.ports.get_port(branch_id.index, port_index); + + } + + return None; + } + + fn branch_ids_of_execution_path(&self, leaf_branch_id: BranchId, parents: &mut Vec) { + debug_assert!(parents.is_empty()); + + let mut next_branch_id = leaf_branch_id; + debug_assert!(next_branch_id.is_valid()); + + while next_branch_id.is_valid() { + parents.push(next_branch_id); + let branch = &self.branches[next_branch_id.index as usize]; + next_branch_id = branch.parent_index; + } + } } /// A data structure passed to a connector whose code is being executed that is diff --git a/src/runtime2/global_store.rs b/src/runtime2/global_store.rs index e71494f58f910e269179d5e47fbdfd28cffca18b..4d1727af58fd65e42030dca6e874baab875504bb 100644 --- a/src/runtime2/global_store.rs +++ b/src/runtime2/global_store.rs @@ -44,6 +44,11 @@ impl ConnectorId { pub fn new_invalid() -> ConnectorId { return ConnectorId(u32::MAX); } + + #[inline] + pub(crate) fn is_valid(&self) -> bool { + return self.0 != u32::MAX; + } } // TODO: Change this, I hate this. But I also don't want to put `public` and @@ -135,7 +140,14 @@ impl ConnectorStore { } } - return ConnectorKey{ index: index as u32 }; + // TODO: Clean up together with the trait + let key = ConnectorKey{ index: index as u32 }; + let connector = self.get_mut(&key); + if let ConnectorVariant::UserDefined(connector) = &mut connector.connector { + connector.set_connector_id(key.downcast()); + } + + return key; } pub(crate) fn destroy(&self, key: ConnectorKey) { diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index 022ab16496f3aa2ab43511267f73f022916e8654..c71ec0d15da177c50795ef34949e878eafb347ba 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -44,25 +44,135 @@ pub struct DataMessage { pub enum SyncBranchConstraint { SilentPort(PortIdLocal), - BranchNumber(u32), - PortMapping(PortIdLocal, u32), + BranchNumber(BranchId), + PortMapping(PortIdLocal, BranchId), } pub struct SyncConnectorSolution { - connector_id: ConnectorId, - terminating_branch_id: BranchId, - execution_branch_ids: Vec, // ends with terminating branch ID + pub connector_id: ConnectorId, + pub terminating_branch_id: BranchId, + pub execution_branch_ids: Vec, // no particular ordering of IDs enforced + pub final_port_mapping: Vec<(PortIdLocal, BranchId)> } pub struct SyncConnectorConstraints { - connector_id: ConnectorId, - constraints: Vec, + pub connector_id: ConnectorId, + pub constraints: Vec, } pub struct SyncMessage { - connector_solutions: Vec, - connector_constraints: Vec, - connectors_to_visit: Vec, + pub local_solutions: Vec, + pub constraints: Vec, + pub to_visit: Vec, +} + +// TODO: Shouldn't really be here, right? +impl SyncMessage { + /// Creates a new sync message. Assumes that it is created by a connector + /// that has just encountered a new local solution. + pub(crate) fn new(initial_solution: SyncConnectorSolution, approximate_peers: usize) -> Self { + let mut local_solutions = Vec::with_capacity(approximate_peers); + local_solutions.push(initial_solution); + + return Self{ + local_solutions, + constraints: Vec::with_capacity(approximate_peers), + to_visit: Vec::with_capacity(approximate_peers), + }; + } + + /// Checks if a connector has already provided a local solution + pub(crate) fn has_local_solution_for(&self, connector_id: ConnectorId) -> bool { + return self.local_solutions + .iter() + .any(|v| v.connector_id == connector_id); + } + + /// Adds a new constraint. If the connector has already provided a local + /// solution then the constraint will be checked. Otherwise the constraint + /// will be added to the solution. If this is the first constraint for a + /// connector then it will be added to the connectors that still have to be + /// visited. + /// + /// If this returns true then the constraint was added, or the local + /// solution for the specified connector satisfies the constraint. If this + /// function returns an error then we're dealing with a nefarious peer. + pub(crate) fn add_or_check_constraint( + &mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint + ) -> Result { + if self.has_local_solution_for(connector_id) { + return self.check_constraint(connector_id, constraint); + } else { + self.add_constraint(connector_id); + return Ok(true); + } + } + + /// Pushes a new connector constraint. Caller must ensure that the solution + /// has not yet arrived at the specified connector (because then it would no + /// longer have constraints, but a proposed solution instead). + pub(crate) fn add_constraint(&mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint) { + debug_assert!(!self.has_local_solution_for(connector_id)); + + let position = self.constraints + .iter() + .position(|v| v.connector_id == connector_id); + + match position { + Some(index) => { + // Has pre-existing constraints + debug_assert!(self.to_visit.contains(&connector_id)); + let entry = &mut self.constraints[index]; + entry.constraints.push(constraint); + }, + None => { + debug_assert!(!self.to_visit.contains(&connector_id)); + self.constraints.push(SyncConnectorConstraints{ + connector_id, + constraints: vec![constraint], + }); + self.to_visit.push(connector_id); + } + } + } + + /// Checks if a constraint is satisfied by a solution. Caller must make sure + /// that a local solution has already been provided. Will return an error + /// value only if the provided constraint does not make sense (i.e. a + /// nefarious peer has supplied a constraint with a port we do not own). + pub(crate) fn check_constraint(&self, connector_id: ConnectorId, constraint: SyncBranchConstraint) -> Result { + debug_assert!(self.has_local_solution_for(connector_id)); + + let entry = self.local_solutions + .iter() + .find(|v| v.connector_id == connector_id) + .unwrap(); + + match constraint { + SyncBranchConstraint::SilentPort(silent_port_id) => { + for (port_id, mapped_id) in &entry.final_port_mapping { + if port_id == silent_port_id { + // If silent, then mapped ID is invalid + return Ok(!mapped_id.is_valid()) + } + } + + return Err(()); + }, + SyncBranchConstraint::BranchNumber(expected_branch_id) => { + return Ok(entry.execution_branch_ids.contains(&expected_branch_id)); + }, + SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => { + for (port_id, mapped_id) in &entry.final_port_mapping { + if port_id == port_id { + return Ok(*mapped_id == expected_branch_id); + } + } + + return Err(()); + }, + } + } } /// A control message. These might be sent by the scheduler to notify eachother diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 82b5de08d295ab4ffa43ff00c86d67787eb17d87..7247c23a3b1190755c7c3aa479bf699582c90771 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -121,7 +121,7 @@ impl ApplicationInterface { } let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?; - let connector = ConnectorPDL::new(0, Branch::new_initial_branch(state), initial_ports); + let connector = ConnectorPDL::new(Branch::new_initial_branch(state), initial_ports); // Put on job queue {