Changeset - c97c5d60bc61
[Not reviewed]
0 4 0
MH - 4 years ago 2021-10-14 18:44:14
contact@maxhenger.nl
commit before restructuring port storage
4 files changed with 234 insertions and 22 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -4,8 +4,12 @@ use std::sync::atomic::AtomicBool;
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::inbox::{PrivateInbox, PublicInbox, OutgoingMessage, DataMessage, SyncMessage};
 
use crate::runtime2::port::PortIdLocal;
 
use super::global_store::ConnectorId;
 
use super::inbox::{
 
    PrivateInbox, PublicInbox, OutgoingMessage, DataMessage, SyncMessage,
 
    SyncBranchConstraint, SyncConnectorSolution
 
};
 
use super::port::PortIdLocal;
 

	
 
/// Represents the identifier of a branch (the index within its container). An
 
/// ID of `0` generally means "no branch" (e.g. no parent, or a port did not
 
@@ -26,7 +30,7 @@ impl BranchId {
 
    }
 

	
 
    #[inline]
 
    fn is_valid(&self) -> bool {
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.index != 0;
 
    }
 
}
 
@@ -199,6 +203,12 @@ impl ConnectorPorts {
 
        return None
 
    }
 

	
 
    /// Retrieves the ID associated with the port at the provided index
 
    #[inline]
 
    fn get_port_id(&self, port_index: usize) -> PortIdLocal {
 
        return self.owned_ports[port_index];
 
    }
 

	
 
    #[inline]
 
    fn get_port(&self, branch_idx: u32, port_idx: usize) -> &PortAssignment {
 
        let mapped_idx = self.mapped_index(branch_idx, port_idx);
 
@@ -266,7 +276,7 @@ impl ConnectorPublic {
 
// TODO: Do this outside of the connector, create a wrapping struct
 
pub(crate) struct ConnectorPDL {
 
    // State and properties of connector itself
 
    id: u32,
 
    id: ConnectorId,
 
    in_sync: bool,
 
    // Branch management
 
    branches: Vec<Branch>, // first branch is always non-speculative one
 
@@ -301,10 +311,12 @@ impl RunContext for TempCtx {
 
impl ConnectorPDL {
 
    /// Constructs a representation of a connector. The assumption is that the
 
    /// initial branch is at the first instruction of the connector's code,
 
    /// hence is in a non-sync state.
 
    pub fn new(id: u32, initial_branch: Branch, owned_ports: Vec<PortIdLocal>) -> Self {
 
    /// hence is in a non-sync state. Note that the initial ID is invalid, we
 
    /// assume the connector will get inserted into the runtime, there it will
 
    /// receive its ID.
 
    pub fn new(initial_branch: Branch, owned_ports: Vec<PortIdLocal>) -> Self {
 
        Self{
 
            id,
 
            id: ConnectorId::new_invalid(),
 
            in_sync: false,
 
            branches: vec![initial_branch],
 
            sync_active: BranchQueue::new(),
 
@@ -316,6 +328,11 @@ impl ConnectorPDL {
 
        }
 
    }
 

	
 
    pub(crate) fn set_connector_id(&mut self, id: ConnectorId) {
 
        debug_assert!(!self.id.is_valid()); // ID should only be set once
 
        self.id = id;
 
    }
 

	
 
    pub fn is_in_sync_mode(&self) -> bool {
 
        return self.in_sync;
 
    }
 
@@ -591,7 +608,7 @@ impl ConnectorPDL {
 
                };
 
                let new_connector_ports = results.ports.clone(); // TODO: Do something with this
 
                let new_connector_branch = Branch::new_initial_branch(new_connector_state);
 
                let new_connector = ConnectorPDL::new(0, new_connector_branch, new_connector_ports);
 
                let new_connector = ConnectorPDL::new(new_connector_branch, new_connector_ports);
 

	
 
                results.new_connectors.push(new_connector);
 

	
 
@@ -653,7 +670,8 @@ impl ConnectorPDL {
 
    }
 

	
 
    // Helpers for local port management. Specifically for adopting/losing
 
    // ownership over ports
 
    // ownership over ports, and for checking if specific ports can be sent
 
    // over another port.
 

	
 
    /// Releasing ownership of ports while in non-sync mode. This only occurs
 
    /// while instantiating new connectors
 
@@ -754,7 +772,79 @@ impl ConnectorPDL {
 
    // Helpers for generating and handling sync messages (and the solutions that
 
    // are described by those sync messages)
 

	
 
    fn generate_initial_solution_for_branch(&self, branch_id: BranchId,)
 
    /// Generates the initial solution for a finished sync branch. If initial
 
    /// local solution is valid, then the appropriate message is returned.
 
    /// Otherwise the initial solution is inconsistent.
 
    fn generate_initial_solution_for_branch(&self, branch_id: BranchId) -> Option<SyncMessage> {
 
        // Retrieve branchg
 
        debug_assert!(branch_id.is_valid()); // because we're supposed to be in sync mode
 
        let branch = &self.branches[branch_id.index as usize];
 
        debug_assert_eq!(branch.sync_state, SpeculativeState::ReachedSyncEnd);
 

	
 
        // Set up storage (this is also the storage for all of the connectors
 
        // that will be visited, hence the initial size approximation)
 
        let mut all_branch_ids = Vec::new();
 
        self.branch_ids_of_execution_path(branch_id, &mut all_branch_ids);
 

	
 
        let num_ports = self.ports.num_ports();
 
        let approximate_peers = num_ports;
 
        let mut initial_solution_port_mapping = Vec::with_capacity(num_ports);
 
        for port_idx in 0..self.ports.num_ports() {
 
            let port_id = self.ports.get_port_id(port_idx);
 
            let port_desc = self.ports.get_port(branch_id.index, port_idx);
 

	
 
            // Note: if assigned then we expect a valid branch ID. Otherwise we have the "invalid
 
            // branch" as ID, marking that we want it to be silent
 
            debug_assert!(port_desc.is_assigned == port_desc.last_registered_branch_id.is_valid());
 
            initial_solution_port_mapping.push((port_id, port_desc.last_registered_branch_id));
 
        }
 

	
 
        let initial_local_solution = SyncConnectorSolution{
 
            connector_id: self.id,
 
            terminating_branch_id: branch_id,
 
            execution_branch_ids: all_branch_ids,
 
            final_port_mapping: initial_solution_port_mapping,
 
        };
 

	
 
        let mut sync_message = SyncMessage::new(initial_local_solution, approximate_peers);
 

	
 
        // Turn local port mapping into constraints on other connectors
 

	
 
        // - constraints on other components due to transferred ports
 

	
 
        for port_delta in &branch.ports_delta {
 
            // For transferred ports we always have two constraints: one for the
 
            // sender and one for the receiver, ensuring it was not used.
 
            // TODO: This will fail if a port is passed around multiple times.
 
            //  maybe a special "passed along" entry in `ports_delta`.
 
            if !sync_message.check_constraint(self.id, SyncBranchConstraint::SilentPort(port_delta.port_id)) {
 
                return None;
 
            }
 
            
 
            if !sync_message.add_or_check_constraint()
 
        }
 

	
 
        // - constraints on other components due to owned ports
 
        for port_index in 0..self.ports.num_ports() {
 
            let port = self.ports.get_port(branch_id.index, port_index);
 

	
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn branch_ids_of_execution_path(&self, leaf_branch_id: BranchId, parents: &mut Vec<BranchId>) {
 
        debug_assert!(parents.is_empty());
 

	
 
        let mut next_branch_id = leaf_branch_id;
 
        debug_assert!(next_branch_id.is_valid());
 

	
 
        while next_branch_id.is_valid() {
 
            parents.push(next_branch_id);
 
            let branch = &self.branches[next_branch_id.index as usize];
 
            next_branch_id = branch.parent_index;
 
        }
 
    }
 
}
 

	
 
/// A data structure passed to a connector whose code is being executed that is
src/runtime2/global_store.rs
Show inline comments
 
@@ -44,6 +44,11 @@ impl ConnectorId {
 
    pub fn new_invalid() -> ConnectorId {
 
        return ConnectorId(u32::MAX);
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.0 != u32::MAX;
 
    }
 
}
 

	
 
// TODO: Change this, I hate this. But I also don't want to put `public` and
 
@@ -135,7 +140,14 @@ impl ConnectorStore {
 
            }
 
        }
 

	
 
        return ConnectorKey{ index: index as u32 };
 
        // TODO: Clean up together with the trait
 
        let key = ConnectorKey{ index: index as u32 };
 
        let connector = self.get_mut(&key);
 
        if let ConnectorVariant::UserDefined(connector) = &mut connector.connector {
 
            connector.set_connector_id(key.downcast());
 
        }
 

	
 
        return key;
 
    }
 

	
 
    pub(crate) fn destroy(&self, key: ConnectorKey) {
src/runtime2/inbox.rs
Show inline comments
 
@@ -44,25 +44,135 @@ pub struct DataMessage {
 

	
 
pub enum SyncBranchConstraint {
 
    SilentPort(PortIdLocal),
 
    BranchNumber(u32),
 
    PortMapping(PortIdLocal, u32),
 
    BranchNumber(BranchId),
 
    PortMapping(PortIdLocal, BranchId),
 
}
 

	
 
pub struct SyncConnectorSolution {
 
    connector_id: ConnectorId,
 
    terminating_branch_id: BranchId,
 
    execution_branch_ids: Vec<BranchId>, // ends with terminating branch ID
 
    pub connector_id: ConnectorId,
 
    pub terminating_branch_id: BranchId,
 
    pub execution_branch_ids: Vec<BranchId>, // no particular ordering of IDs enforced
 
    pub final_port_mapping: Vec<(PortIdLocal, BranchId)>
 
}
 

	
 
pub struct SyncConnectorConstraints {
 
    connector_id: ConnectorId,
 
    constraints: Vec<SyncBranchConstraint>,
 
    pub connector_id: ConnectorId,
 
    pub constraints: Vec<SyncBranchConstraint>,
 
}
 

	
 
pub struct SyncMessage {
 
    connector_solutions: Vec<SyncConnectorSolution>,
 
    connector_constraints: Vec<SyncConnectorConstraints>,
 
    connectors_to_visit: Vec<u32>,
 
    pub local_solutions: Vec<SyncConnectorSolution>,
 
    pub constraints: Vec<SyncConnectorConstraints>,
 
    pub to_visit: Vec<ConnectorId>,
 
}
 

	
 
// TODO: Shouldn't really be here, right?
 
impl SyncMessage {
 
    /// Creates a new sync message. Assumes that it is created by a connector
 
    /// that has just encountered a new local solution.
 
    pub(crate) fn new(initial_solution: SyncConnectorSolution, approximate_peers: usize) -> Self {
 
        let mut local_solutions = Vec::with_capacity(approximate_peers);
 
        local_solutions.push(initial_solution);
 

	
 
        return Self{
 
            local_solutions,
 
            constraints: Vec::with_capacity(approximate_peers),
 
            to_visit: Vec::with_capacity(approximate_peers),
 
        };
 
    }
 

	
 
    /// Checks if a connector has already provided a local solution
 
    pub(crate) fn has_local_solution_for(&self, connector_id: ConnectorId) -> bool {
 
        return self.local_solutions
 
            .iter()
 
            .any(|v| v.connector_id == connector_id);
 
    }
 

	
 
    /// Adds a new constraint. If the connector has already provided a local
 
    /// solution then the constraint will be checked. Otherwise the constraint
 
    /// will be added to the solution. If this is the first constraint for a
 
    /// connector then it will be added to the connectors that still have to be
 
    /// visited.
 
    ///
 
    /// If this returns true then the constraint was added, or the local
 
    /// solution for the specified connector satisfies the constraint. If this
 
    /// function returns an error then we're dealing with a nefarious peer.
 
    pub(crate) fn add_or_check_constraint(
 
        &mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint
 
    ) -> Result<bool, ()> {
 
        if self.has_local_solution_for(connector_id) {
 
            return self.check_constraint(connector_id, constraint);
 
        } else {
 
            self.add_constraint(connector_id);
 
            return Ok(true);
 
        }
 
    }
 

	
 
    /// Pushes a new connector constraint. Caller must ensure that the solution
 
    /// has not yet arrived at the specified connector (because then it would no
 
    /// longer have constraints, but a proposed solution instead).
 
    pub(crate) fn add_constraint(&mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint) {
 
        debug_assert!(!self.has_local_solution_for(connector_id));
 

	
 
        let position = self.constraints
 
            .iter()
 
            .position(|v| v.connector_id == connector_id);
 

	
 
        match position {
 
            Some(index) => {
 
                // Has pre-existing constraints
 
                debug_assert!(self.to_visit.contains(&connector_id));
 
                let entry = &mut self.constraints[index];
 
                entry.constraints.push(constraint);
 
            },
 
            None => {
 
                debug_assert!(!self.to_visit.contains(&connector_id));
 
                self.constraints.push(SyncConnectorConstraints{
 
                    connector_id,
 
                    constraints: vec![constraint],
 
                });
 
                self.to_visit.push(connector_id);
 
            }
 
        }
 
    }
 

	
 
    /// Checks if a constraint is satisfied by a solution. Caller must make sure
 
    /// that a local solution has already been provided. Will return an error
 
    /// value only if the provided constraint does not make sense (i.e. a
 
    /// nefarious peer has supplied a constraint with a port we do not own).
 
    pub(crate) fn check_constraint(&self, connector_id: ConnectorId, constraint: SyncBranchConstraint) -> Result<bool, ()>  {
 
        debug_assert!(self.has_local_solution_for(connector_id));
 

	
 
        let entry = self.local_solutions
 
            .iter()
 
            .find(|v| v.connector_id == connector_id)
 
            .unwrap();
 

	
 
        match constraint {
 
            SyncBranchConstraint::SilentPort(silent_port_id) => {
 
                for (port_id, mapped_id) in &entry.final_port_mapping {
 
                    if port_id == silent_port_id {
 
                        // If silent, then mapped ID is invalid
 
                        return Ok(!mapped_id.is_valid())
 
                    }
 
                }
 

	
 
                return Err(());
 
            },
 
            SyncBranchConstraint::BranchNumber(expected_branch_id) => {
 
                return Ok(entry.execution_branch_ids.contains(&expected_branch_id));
 
            },
 
            SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => {
 
                for (port_id, mapped_id) in &entry.final_port_mapping {
 
                    if port_id == port_id {
 
                        return Ok(*mapped_id == expected_branch_id);
 
                    }
 
                }
 

	
 
                return Err(());
 
            },
 
        }
 
    }
 
}
 

	
 
/// A control message. These might be sent by the scheduler to notify eachother
src/runtime2/native.rs
Show inline comments
 
@@ -121,7 +121,7 @@ impl ApplicationInterface {
 
        }
 

	
 
        let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?;
 
        let connector = ConnectorPDL::new(0, Branch::new_initial_branch(state), initial_ports);
 
        let connector = ConnectorPDL::new(Branch::new_initial_branch(state), initial_ports);
 

	
 
        // Put on job queue
 
        {
0 comments (0 inline, 0 general)