Changeset - 418aa1170154
[Not reviewed]
0 1 0
MH - 4 years ago 2021-10-14 22:23:24
contact@maxhenger.nl
initial sync solution generation
1 file changed with 41 insertions and 8 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::ops::Deref;
 
use std::sync::atomic::AtomicBool;
 

	
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::global_store::{ConnectorKey, GlobalStore};
 
use crate::runtime2::port::PortKind;
 
use super::global_store::ConnectorId;
 
use super::inbox::{
 
    PrivateInbox, PublicInbox, OutgoingMessage, DataMessage, SyncMessage,
 
@@ -341,9 +344,12 @@ impl ConnectorPDL {
 

	
 
    }
 

	
 
    pub fn run(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    // TODO: Remove GlobalStore, is used to retrieve ports. Ports belong with
 
    //  the connector itself, half managed, half accessible (a-la PublicInbox
 
    //  and PrivateInbox)
 
    pub fn run(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        if self.in_sync {
 
            let scheduling = self.run_in_speculative_mode(pd, results);
 
            let scheduling = self.run_in_speculative_mode(pd, global, results);
 

	
 
            // When in speculative mode we might have generated new sync
 
            // solutions, we need to turn them into proposed solutions here.
 
@@ -359,8 +365,10 @@ impl ConnectorPDL {
 

	
 
                // Transform branch into proposed global solution
 
            }
 

	
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(pd, results);
 
            let scheduling = self.run_in_deterministic_mode(pd, global, results);
 
            return scheduling;
 
        }
 
    }
 
@@ -370,7 +378,7 @@ impl ConnectorPDL {
 
    /// where it is the caller's responsibility to immediately take care of
 
    /// those changes. The return value indicates when (and if) the connector
 
    /// needs to be scheduled again.
 
    pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(self.in_sync);
 
        debug_assert!(!self.sync_active.is_empty());
 

	
 
@@ -562,7 +570,7 @@ impl ConnectorPDL {
 
    }
 

	
 
    /// Runs the connector in non-synchronous mode.
 
    pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(!self.in_sync);
 
        debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty());
 
        debug_assert!(self.branches.len() == 1);
 
@@ -775,7 +783,7 @@ impl ConnectorPDL {
 
    /// Generates the initial solution for a finished sync branch. If initial
 
    /// local solution is valid, then the appropriate message is returned.
 
    /// Otherwise the initial solution is inconsistent.
 
    fn generate_initial_solution_for_branch(&self, branch_id: BranchId) -> Option<SyncMessage> {
 
    fn generate_initial_solution_for_branch(&self, branch_id: BranchId, key: &ConnectorKey, global: &GlobalStore) -> Option<SyncMessage> {
 
        // Retrieve branchg
 
        debug_assert!(branch_id.is_valid()); // because we're supposed to be in sync mode
 
        let branch = &self.branches[branch_id.index as usize];
 
@@ -811,7 +819,6 @@ impl ConnectorPDL {
 
        // Turn local port mapping into constraints on other connectors
 

	
 
        // - constraints on other components due to transferred ports
 

	
 
        for port_delta in &branch.ports_delta {
 
            // For transferred ports we always have two constraints: one for the
 
            // sender and one for the receiver, ensuring it was not used.
 
@@ -821,17 +828,43 @@ impl ConnectorPDL {
 
                return None;
 
            }
 

	
 
            if !sync_message.add_or_check_constraint()
 
            // Might need to check if we own the other side of the channel
 
            let (peer_port_id, peer_connector_id) = {
 
                let port = global.ports.get(key, port_delta.port_id);
 
                (port.peer_id, port.peer_connector)
 
            };
 

	
 
            if !sync_message.add_or_check_constraint(peer_connector, SyncBranchConstraint::SilentPort(peer_port_id)).unwrap() {
 
                return None;
 
            }
 
        }
 

	
 
        // - constraints on other components due to owned ports
 
        for port_index in 0..self.ports.num_ports() {
 
            let port_id = self.ports.get_port_id(port_index);
 
            let port = self.ports.get_port(branch_id.index, port_index);
 
            let (peer_port_id, peer_connector_id, is_getter) = {
 
                let port = global.ports.get(key, port_id);
 
                (port.peer_id, port.peer_connector, port.kind == PortKind::Getter)
 
            };
 

	
 
            let constraint = if port.is_assigned {
 
                if is_getter {
 
                    SyncBranchConstraint::BranchNumber(port.last_registered_branch_id)
 
                } else {
 
                    SyncBranchConstraint::PortMapping(peer_port_id, port.last_registered_branch_id)
 
                }
 
            } else {
 
                SyncBranchConstraint::SilentPort(peer_port_id)
 
            };
 

	
 
            if !sync_message.add_or_check_constraint(peer_connector_id, constraint).unwrap() {
 
                return None;
 
            }
 
        }
 

	
 
        return Some(sync_message);
 
    }
 

	
 
    fn branch_ids_of_execution_path(&self, leaf_branch_id: BranchId, parents: &mut Vec<BranchId>) {
 
        debug_assert!(parents.is_empty());
0 comments (0 inline, 0 general)