Changeset - 418aa1170154
[Not reviewed]
0 1 0
MH - 4 years ago 2021-10-14 22:23:24
contact@maxhenger.nl
initial sync solution generation
1 file changed with 43 insertions and 10 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::ops::Deref;
 
use std::sync::atomic::AtomicBool;
 

	
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::global_store::{ConnectorKey, GlobalStore};
 
use crate::runtime2::port::PortKind;
 
use super::global_store::ConnectorId;
 
use super::inbox::{
 
    PrivateInbox, PublicInbox, OutgoingMessage, DataMessage, SyncMessage,
 
    SyncBranchConstraint, SyncConnectorSolution
 
};
 
use super::port::PortIdLocal;
 
@@ -338,15 +341,18 @@ impl ConnectorPDL {
 
    }
 

	
 
    pub fn insert_sync_message(&mut self, message: SyncMessage, results: &mut RunDeltaState) {
 

	
 
    }
 

	
 
    pub fn run(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    // TODO: Remove GlobalStore, is used to retrieve ports. Ports belong with
 
    //  the connector itself, half managed, half accessible (a-la PublicInbox
 
    //  and PrivateInbox)
 
    pub fn run(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        if self.in_sync {
 
            let scheduling = self.run_in_speculative_mode(pd, results);
 
            let scheduling = self.run_in_speculative_mode(pd, global, results);
 

	
 
            // When in speculative mode we might have generated new sync
 
            // solutions, we need to turn them into proposed solutions here.
 
            if self.sync_finished_last_handled != self.sync_finished.last {
 
                let mut next_id;
 
                if self.sync_finished_last_handled == 0 {
 
@@ -356,24 +362,26 @@ impl ConnectorPDL {
 
                    debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue"
 
                    next_id = last_handled.next_branch_in_queue.unwrap();
 
                }
 

	
 
                // Transform branch into proposed global solution
 
            }
 

	
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(pd, results);
 
            let scheduling = self.run_in_deterministic_mode(pd, global, results);
 
            return scheduling;
 
        }
 
    }
 

	
 
    /// Runs the connector in synchronous mode. Potential changes to the global
 
    /// system's state are added to the `RunDeltaState` object by the connector,
 
    /// where it is the caller's responsibility to immediately take care of
 
    /// those changes. The return value indicates when (and if) the connector
 
    /// needs to be scheduled again.
 
    pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(self.in_sync);
 
        debug_assert!(!self.sync_active.is_empty());
 

	
 
        let branch = Self::pop_branch(&mut self.branches, &mut self.sync_active);
 

	
 
        // Run the branch to the next blocking point
 
@@ -559,13 +567,13 @@ impl ConnectorPDL {
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    /// Runs the connector in non-synchronous mode.
 
    pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(!self.in_sync);
 
        debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty());
 
        debug_assert!(self.branches.len() == 1);
 

	
 
        let branch = &mut self.branches[0];
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 
@@ -772,13 +780,13 @@ impl ConnectorPDL {
 
    // Helpers for generating and handling sync messages (and the solutions that
 
    // are described by those sync messages)
 

	
 
    /// Generates the initial solution for a finished sync branch. If initial
 
    /// local solution is valid, then the appropriate message is returned.
 
    /// Otherwise the initial solution is inconsistent.
 
    fn generate_initial_solution_for_branch(&self, branch_id: BranchId) -> Option<SyncMessage> {
 
    fn generate_initial_solution_for_branch(&self, branch_id: BranchId, key: &ConnectorKey, global: &GlobalStore) -> Option<SyncMessage> {
 
        // Retrieve branchg
 
        debug_assert!(branch_id.is_valid()); // because we're supposed to be in sync mode
 
        let branch = &self.branches[branch_id.index as usize];
 
        debug_assert_eq!(branch.sync_state, SpeculativeState::ReachedSyncEnd);
 

	
 
        // Set up storage (this is also the storage for all of the connectors
 
@@ -808,32 +816,57 @@ impl ConnectorPDL {
 

	
 
        let mut sync_message = SyncMessage::new(initial_local_solution, approximate_peers);
 

	
 
        // Turn local port mapping into constraints on other connectors
 

	
 
        // - constraints on other components due to transferred ports
 

	
 
        for port_delta in &branch.ports_delta {
 
            // For transferred ports we always have two constraints: one for the
 
            // sender and one for the receiver, ensuring it was not used.
 
            // TODO: This will fail if a port is passed around multiple times.
 
            //  maybe a special "passed along" entry in `ports_delta`.
 
            if !sync_message.check_constraint(self.id, SyncBranchConstraint::SilentPort(port_delta.port_id)) {
 
                return None;
 
            }
 
            
 
            if !sync_message.add_or_check_constraint()
 

	
 
            // Might need to check if we own the other side of the channel
 
            let (peer_port_id, peer_connector_id) = {
 
                let port = global.ports.get(key, port_delta.port_id);
 
                (port.peer_id, port.peer_connector)
 
            };
 

	
 
            if !sync_message.add_or_check_constraint(peer_connector, SyncBranchConstraint::SilentPort(peer_port_id)).unwrap() {
 
                return None;
 
            }
 
        }
 

	
 
        // - constraints on other components due to owned ports
 
        for port_index in 0..self.ports.num_ports() {
 
            let port_id = self.ports.get_port_id(port_index);
 
            let port = self.ports.get_port(branch_id.index, port_index);
 
            let (peer_port_id, peer_connector_id, is_getter) = {
 
                let port = global.ports.get(key, port_id);
 
                (port.peer_id, port.peer_connector, port.kind == PortKind::Getter)
 
            };
 

	
 
            let constraint = if port.is_assigned {
 
                if is_getter {
 
                    SyncBranchConstraint::BranchNumber(port.last_registered_branch_id)
 
                } else {
 
                    SyncBranchConstraint::PortMapping(peer_port_id, port.last_registered_branch_id)
 
                }
 
            } else {
 
                SyncBranchConstraint::SilentPort(peer_port_id)
 
            };
 

	
 
            if !sync_message.add_or_check_constraint(peer_connector_id, constraint).unwrap() {
 
                return None;
 
            }
 
        }
 

	
 
        return None;
 
        return Some(sync_message);
 
    }
 

	
 
    fn branch_ids_of_execution_path(&self, leaf_branch_id: BranchId, parents: &mut Vec<BranchId>) {
 
        debug_assert!(parents.is_empty());
 

	
 
        let mut next_branch_id = leaf_branch_id;
0 comments (0 inline, 0 general)