Changeset - 418aa1170154
[Not reviewed]
0 1 0
MH - 4 years ago 2021-10-14 22:23:24
contact@maxhenger.nl
initial sync solution generation
1 file changed with 43 insertions and 10 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::ops::Deref;
 
use std::sync::atomic::AtomicBool;
 

	
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::global_store::{ConnectorKey, GlobalStore};
 
use crate::runtime2::port::PortKind;
 
use super::global_store::ConnectorId;
 
use super::inbox::{
 
    PrivateInbox, PublicInbox, OutgoingMessage, DataMessage, SyncMessage,
 
    SyncBranchConstraint, SyncConnectorSolution
 
};
 
use super::port::PortIdLocal;
 

	
 
/// Represents the identifier of a branch (the index within its container). An
 
/// ID of `0` generally means "no branch" (e.g. no parent, or a port did not
 
/// yet receive anything from any branch).
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct BranchId {
 
@@ -332,54 +335,59 @@ impl ConnectorPDL {
 
        debug_assert!(!self.id.is_valid()); // ID should only be set once
 
        self.id = id;
 
    }
 

	
 
    pub fn is_in_sync_mode(&self) -> bool {
 
        return self.in_sync;
 
    }
 

	
 
    pub fn insert_sync_message(&mut self, message: SyncMessage, results: &mut RunDeltaState) {
 

	
 
    }
 

	
 
    pub fn run(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    // TODO: Remove GlobalStore, is used to retrieve ports. Ports belong with
 
    //  the connector itself, half managed, half accessible (a-la PublicInbox
 
    //  and PrivateInbox)
 
    pub fn run(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        if self.in_sync {
 
            let scheduling = self.run_in_speculative_mode(pd, results);
 
            let scheduling = self.run_in_speculative_mode(pd, global, results);
 

	
 
            // When in speculative mode we might have generated new sync
 
            // solutions, we need to turn them into proposed solutions here.
 
            if self.sync_finished_last_handled != self.sync_finished.last {
 
                let mut next_id;
 
                if self.sync_finished_last_handled == 0 {
 
                    next_id = self.sync_finished.first;
 
                } else {
 
                    let last_handled = &self.branches[self.sync_finished_last_handled as usize];
 
                    debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue"
 
                    next_id = last_handled.next_branch_in_queue.unwrap();
 
                }
 

	
 
                // Transform branch into proposed global solution
 
            }
 

	
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(pd, results);
 
            let scheduling = self.run_in_deterministic_mode(pd, global, results);
 
            return scheduling;
 
        }
 
    }
 

	
 
    /// Runs the connector in synchronous mode. Potential changes to the global
 
    /// system's state are added to the `RunDeltaState` object by the connector,
 
    /// where it is the caller's responsibility to immediately take care of
 
    /// those changes. The return value indicates when (and if) the connector
 
    /// needs to be scheduled again.
 
    pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(self.in_sync);
 
        debug_assert!(!self.sync_active.is_empty());
 

	
 
        let branch = Self::pop_branch(&mut self.branches, &mut self.sync_active);
 

	
 
        // Run the branch to the next blocking point
 
        let mut run_context = TempCtx{};
 
        let run_result = branch.code_state.run(&mut run_context, pd);
 

	
 
        // Match statement contains `return` statements only if the particular
 
        // run result behind handled requires an immediate re-run of the
 
        // connector.
 
@@ -553,25 +561,25 @@ impl ConnectorPDL {
 
        }
 

	
 
        // Not immediately scheduling, so schedule again if there are more
 
        // branches to run
 
        if self.sync_active.is_empty() {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    /// Runs the connector in non-synchronous mode.
 
    pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(!self.in_sync);
 
        debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty());
 
        debug_assert!(self.branches.len() == 1);
 

	
 
        let branch = &mut self.branches[0];
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = TempCtx{};
 
        let run_result = branch.code_state.run(&mut run_context, pd);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => {
 
@@ -766,25 +774,25 @@ impl ConnectorPDL {
 
            });
 
        }
 

	
 
        return Ok(())
 
    }
 

	
 
    // Helpers for generating and handling sync messages (and the solutions that
 
    // are described by those sync messages)
 

	
 
    /// Generates the initial solution for a finished sync branch. If initial
 
    /// local solution is valid, then the appropriate message is returned.
 
    /// Otherwise the initial solution is inconsistent.
 
    fn generate_initial_solution_for_branch(&self, branch_id: BranchId) -> Option<SyncMessage> {
 
    fn generate_initial_solution_for_branch(&self, branch_id: BranchId, key: &ConnectorKey, global: &GlobalStore) -> Option<SyncMessage> {
 
        // Retrieve branchg
 
        debug_assert!(branch_id.is_valid()); // because we're supposed to be in sync mode
 
        let branch = &self.branches[branch_id.index as usize];
 
        debug_assert_eq!(branch.sync_state, SpeculativeState::ReachedSyncEnd);
 

	
 
        // Set up storage (this is also the storage for all of the connectors
 
        // that will be visited, hence the initial size approximation)
 
        let mut all_branch_ids = Vec::new();
 
        self.branch_ids_of_execution_path(branch_id, &mut all_branch_ids);
 

	
 
        let num_ports = self.ports.num_ports();
 
        let approximate_peers = num_ports;
 
@@ -802,44 +810,69 @@ impl ConnectorPDL {
 
        let initial_local_solution = SyncConnectorSolution{
 
            connector_id: self.id,
 
            terminating_branch_id: branch_id,
 
            execution_branch_ids: all_branch_ids,
 
            final_port_mapping: initial_solution_port_mapping,
 
        };
 

	
 
        let mut sync_message = SyncMessage::new(initial_local_solution, approximate_peers);
 

	
 
        // Turn local port mapping into constraints on other connectors
 

	
 
        // - constraints on other components due to transferred ports
 

	
 
        for port_delta in &branch.ports_delta {
 
            // For transferred ports we always have two constraints: one for the
 
            // sender and one for the receiver, ensuring it was not used.
 
            // TODO: This will fail if a port is passed around multiple times.
 
            //  maybe a special "passed along" entry in `ports_delta`.
 
            if !sync_message.check_constraint(self.id, SyncBranchConstraint::SilentPort(port_delta.port_id)) {
 
                return None;
 
            }
 
            
 
            if !sync_message.add_or_check_constraint()
 

	
 
            // Might need to check if we own the other side of the channel
 
            let (peer_port_id, peer_connector_id) = {
 
                let port = global.ports.get(key, port_delta.port_id);
 
                (port.peer_id, port.peer_connector)
 
            };
 

	
 
            if !sync_message.add_or_check_constraint(peer_connector, SyncBranchConstraint::SilentPort(peer_port_id)).unwrap() {
 
                return None;
 
            }
 
        }
 

	
 
        // - constraints on other components due to owned ports
 
        for port_index in 0..self.ports.num_ports() {
 
            let port_id = self.ports.get_port_id(port_index);
 
            let port = self.ports.get_port(branch_id.index, port_index);
 
            let (peer_port_id, peer_connector_id, is_getter) = {
 
                let port = global.ports.get(key, port_id);
 
                (port.peer_id, port.peer_connector, port.kind == PortKind::Getter)
 
            };
 

	
 
            let constraint = if port.is_assigned {
 
                if is_getter {
 
                    SyncBranchConstraint::BranchNumber(port.last_registered_branch_id)
 
                } else {
 
                    SyncBranchConstraint::PortMapping(peer_port_id, port.last_registered_branch_id)
 
                }
 
            } else {
 
                SyncBranchConstraint::SilentPort(peer_port_id)
 
            };
 

	
 
            if !sync_message.add_or_check_constraint(peer_connector_id, constraint).unwrap() {
 
                return None;
 
            }
 
        }
 

	
 
        return None;
 
        return Some(sync_message);
 
    }
 

	
 
    fn branch_ids_of_execution_path(&self, leaf_branch_id: BranchId, parents: &mut Vec<BranchId>) {
 
        debug_assert!(parents.is_empty());
 

	
 
        let mut next_branch_id = leaf_branch_id;
 
        debug_assert!(next_branch_id.is_valid());
 

	
 
        while next_branch_id.is_valid() {
 
            parents.push(next_branch_id);
 
            let branch = &self.branches[next_branch_id.index as usize];
 
            next_branch_id = branch.parent_index;
0 comments (0 inline, 0 general)