Changeset - 418aa1170154
[Not reviewed]
0 1 0
MH - 4 years ago 2021-10-14 22:23:24
contact@maxhenger.nl
initial sync solution generation
1 file changed with 43 insertions and 10 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::ops::Deref;
 
use std::sync::atomic::AtomicBool;
 

	
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::global_store::{ConnectorKey, GlobalStore};
 
use crate::runtime2::port::PortKind;
 
use super::global_store::ConnectorId;
 
use super::inbox::{
 
    PrivateInbox, PublicInbox, OutgoingMessage, DataMessage, SyncMessage,
 
    SyncBranchConstraint, SyncConnectorSolution
 
};
 
use super::port::PortIdLocal;
 

	
 
/// Represents the identifier of a branch (the index within its container). An
 
/// ID of `0` generally means "no branch" (e.g. no parent, or a port did not
 
/// yet receive anything from any branch).
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct BranchId {
 
    pub index: u32,
 
}
 

	
 
impl BranchId {
 
    fn new_invalid() -> Self {
 
        Self{ index: 0 }
 
    }
 

	
 
    fn new(index: u32) -> Self {
 
        debug_assert!(index != 0);
 
        Self{ index }
 
    }
 
@@ -320,78 +323,83 @@ impl ConnectorPDL {
 
            in_sync: false,
 
            branches: vec![initial_branch],
 
            sync_active: BranchQueue::new(),
 
            sync_pending_get: BranchQueue::new(),
 
            sync_finished: BranchQueue::new(),
 
            sync_finished_last_handled: 0, // none at all
 
            inbox: PrivateInbox::new(),
 
            ports: ConnectorPorts::new(owned_ports),
 
        }
 
    }
 

	
 
    pub(crate) fn set_connector_id(&mut self, id: ConnectorId) {
 
        debug_assert!(!self.id.is_valid()); // ID should only be set once
 
        self.id = id;
 
    }
 

	
 
    pub fn is_in_sync_mode(&self) -> bool {
 
        return self.in_sync;
 
    }
 

	
 
    pub fn insert_sync_message(&mut self, message: SyncMessage, results: &mut RunDeltaState) {
 

	
 
    }
 

	
 
    pub fn run(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    // TODO: Remove GlobalStore, is used to retrieve ports. Ports belong with
 
    //  the connector itself, half managed, half accessible (a-la PublicInbox
 
    //  and PrivateInbox)
 
    pub fn run(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        if self.in_sync {
 
            let scheduling = self.run_in_speculative_mode(pd, results);
 
            let scheduling = self.run_in_speculative_mode(pd, global, results);
 

	
 
            // When in speculative mode we might have generated new sync
 
            // solutions, we need to turn them into proposed solutions here.
 
            if self.sync_finished_last_handled != self.sync_finished.last {
 
                let mut next_id;
 
                if self.sync_finished_last_handled == 0 {
 
                    next_id = self.sync_finished.first;
 
                } else {
 
                    let last_handled = &self.branches[self.sync_finished_last_handled as usize];
 
                    debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue"
 
                    next_id = last_handled.next_branch_in_queue.unwrap();
 
                }
 

	
 
                // Transform branch into proposed global solution
 
            }
 

	
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(pd, results);
 
            let scheduling = self.run_in_deterministic_mode(pd, global, results);
 
            return scheduling;
 
        }
 
    }
 

	
 
    /// Runs the connector in synchronous mode. Potential changes to the global
 
    /// system's state are added to the `RunDeltaState` object by the connector,
 
    /// where it is the caller's responsibility to immediately take care of
 
    /// those changes. The return value indicates when (and if) the connector
 
    /// needs to be scheduled again.
 
    pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(self.in_sync);
 
        debug_assert!(!self.sync_active.is_empty());
 

	
 
        let branch = Self::pop_branch(&mut self.branches, &mut self.sync_active);
 

	
 
        // Run the branch to the next blocking point
 
        let mut run_context = TempCtx{};
 
        let run_result = branch.code_state.run(&mut run_context, pd);
 

	
 
        // Match statement contains `return` statements only if the particular
 
        // run result behind handled requires an immediate re-run of the
 
        // connector.
 
        match run_result {
 
            RunResult::BranchInconsistent => {
 
                // Speculative branch became inconsistent
 
                branch.sync_state = SpeculativeState::Inconsistent;
 
            },
 
            RunResult::BranchMissingPortState(port_id) => {
 
                // Branch called `fires()` on a port that does not yet have an
 
                // assigned speculative value. So we need to create those
 
                // branches
 
                let local_port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let local_port_index = self.ports.get_port_index(local_port_id).unwrap();
 

	
 
@@ -541,49 +549,49 @@ impl ConnectorPDL {
 
                    debug_assert!(results.ports.is_empty());
 
                    find_ports_in_value_group(&message.message, &mut results.ports);
 
                    Self::release_ports_during_sync(&mut self.ports, branch, &results.ports);
 
                    results.ports.clear();
 

	
 
                    results.outbox.push(message);
 
                    return ConnectorScheduling::Immediate
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in sync mode", run_result),
 
        }
 

	
 
        // Not immediately scheduling, so schedule again if there are more
 
        // branches to run
 
        if self.sync_active.is_empty() {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    /// Runs the connector in non-synchronous mode.
 
    pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(!self.in_sync);
 
        debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty());
 
        debug_assert!(self.branches.len() == 1);
 

	
 
        let branch = &mut self.branches[0];
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = TempCtx{};
 
        let run_result = branch.code_state.run(&mut run_context, pd);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => {
 
                // Need to wait until all children are terminated
 
                // TODO: Think about how to do this?
 
                branch.sync_state = SpeculativeState::Finished;
 
                return ConnectorScheduling::NotNow;
 
            },
 
            RunResult::ComponentAtSyncStart => {
 
                // Prepare for sync execution and reschedule immediately
 
                self.in_sync = true;
 
                let first_sync_branch = Branch::new_sync_branching_from(1, branch);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch.index);
 
                self.branches.push(first_sync_branch);
 

	
 
@@ -754,104 +762,129 @@ impl ConnectorPDL {
 
                        // Sending ports to ourselves
 
                        debug_assert!(ports.get_port_index(*port_id).is_some());
 
                        branch.ports_delta.remove(delta_idx);
 
                        continue 'port_loop;
 
                    }
 
                }
 
            }
 

	
 
            // If here then we can safely acquire the new port
 
            branch.ports_delta.push(PortOwnershipDelta{
 
                acquired: true,
 
                port_id: *port_id,
 
            });
 
        }
 

	
 
        return Ok(())
 
    }
 

	
 
    // Helpers for generating and handling sync messages (and the solutions that
 
    // are described by those sync messages)
 

	
 
    /// Generates the initial solution for a finished sync branch. If initial
 
    /// local solution is valid, then the appropriate message is returned.
 
    /// Otherwise the initial solution is inconsistent.
 
    fn generate_initial_solution_for_branch(&self, branch_id: BranchId) -> Option<SyncMessage> {
 
    fn generate_initial_solution_for_branch(&self, branch_id: BranchId, key: &ConnectorKey, global: &GlobalStore) -> Option<SyncMessage> {
 
        // Retrieve branchg
 
        debug_assert!(branch_id.is_valid()); // because we're supposed to be in sync mode
 
        let branch = &self.branches[branch_id.index as usize];
 
        debug_assert_eq!(branch.sync_state, SpeculativeState::ReachedSyncEnd);
 

	
 
        // Set up storage (this is also the storage for all of the connectors
 
        // that will be visited, hence the initial size approximation)
 
        let mut all_branch_ids = Vec::new();
 
        self.branch_ids_of_execution_path(branch_id, &mut all_branch_ids);
 

	
 
        let num_ports = self.ports.num_ports();
 
        let approximate_peers = num_ports;
 
        let mut initial_solution_port_mapping = Vec::with_capacity(num_ports);
 
        for port_idx in 0..self.ports.num_ports() {
 
            let port_id = self.ports.get_port_id(port_idx);
 
            let port_desc = self.ports.get_port(branch_id.index, port_idx);
 

	
 
            // Note: if assigned then we expect a valid branch ID. Otherwise we have the "invalid
 
            // branch" as ID, marking that we want it to be silent
 
            debug_assert!(port_desc.is_assigned == port_desc.last_registered_branch_id.is_valid());
 
            initial_solution_port_mapping.push((port_id, port_desc.last_registered_branch_id));
 
        }
 

	
 
        let initial_local_solution = SyncConnectorSolution{
 
            connector_id: self.id,
 
            terminating_branch_id: branch_id,
 
            execution_branch_ids: all_branch_ids,
 
            final_port_mapping: initial_solution_port_mapping,
 
        };
 

	
 
        let mut sync_message = SyncMessage::new(initial_local_solution, approximate_peers);
 

	
 
        // Turn local port mapping into constraints on other connectors
 

	
 
        // - constraints on other components due to transferred ports
 

	
 
        for port_delta in &branch.ports_delta {
 
            // For transferred ports we always have two constraints: one for the
 
            // sender and one for the receiver, ensuring it was not used.
 
            // TODO: This will fail if a port is passed around multiple times.
 
            //  maybe a special "passed along" entry in `ports_delta`.
 
            if !sync_message.check_constraint(self.id, SyncBranchConstraint::SilentPort(port_delta.port_id)) {
 
                return None;
 
            }
 
            
 
            if !sync_message.add_or_check_constraint()
 

	
 
            // Might need to check if we own the other side of the channel
 
            let (peer_port_id, peer_connector_id) = {
 
                let port = global.ports.get(key, port_delta.port_id);
 
                (port.peer_id, port.peer_connector)
 
            };
 

	
 
            if !sync_message.add_or_check_constraint(peer_connector, SyncBranchConstraint::SilentPort(peer_port_id)).unwrap() {
 
                return None;
 
            }
 
        }
 

	
 
        // - constraints on other components due to owned ports
 
        for port_index in 0..self.ports.num_ports() {
 
            let port_id = self.ports.get_port_id(port_index);
 
            let port = self.ports.get_port(branch_id.index, port_index);
 
            let (peer_port_id, peer_connector_id, is_getter) = {
 
                let port = global.ports.get(key, port_id);
 
                (port.peer_id, port.peer_connector, port.kind == PortKind::Getter)
 
            };
 

	
 
            let constraint = if port.is_assigned {
 
                if is_getter {
 
                    SyncBranchConstraint::BranchNumber(port.last_registered_branch_id)
 
                } else {
 
                    SyncBranchConstraint::PortMapping(peer_port_id, port.last_registered_branch_id)
 
                }
 
            } else {
 
                SyncBranchConstraint::SilentPort(peer_port_id)
 
            };
 

	
 
            if !sync_message.add_or_check_constraint(peer_connector_id, constraint).unwrap() {
 
                return None;
 
            }
 
        }
 

	
 
        return None;
 
        return Some(sync_message);
 
    }
 

	
 
    fn branch_ids_of_execution_path(&self, leaf_branch_id: BranchId, parents: &mut Vec<BranchId>) {
 
        debug_assert!(parents.is_empty());
 

	
 
        let mut next_branch_id = leaf_branch_id;
 
        debug_assert!(next_branch_id.is_valid());
 

	
 
        while next_branch_id.is_valid() {
 
            parents.push(next_branch_id);
 
            let branch = &self.branches[next_branch_id.index as usize];
 
            next_branch_id = branch.parent_index;
 
        }
 
    }
 
}
 

	
 
/// A data structure passed to a connector whose code is being executed that is
 
/// used to queue up various state changes that have to be applied after
 
/// running, e.g. the messages the have to be transferred to other connectors.
 
// TODO: Come up with a better name
 
pub(crate) struct RunDeltaState {
 
    // Variables that allow the thread running the connector to pick up global
 
    // state changes and try to apply them.
 
    pub outbox: Vec<OutgoingMessage>,
0 comments (0 inline, 0 general)