Changeset - 8a530d2dc72f
[Not reviewed]
0 3 0
mh - 4 years ago 2021-10-15 17:53:09
contact@maxhenger.nl
basic message passing sync-resolving
3 files changed with 259 insertions and 9 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -6,10 +6,11 @@ use crate::{PortId, ProtocolDescription};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::global_store::{ConnectorKey, GlobalStore};
 
use crate::runtime2::inbox::{OutgoingMessage, SolutionMessage};
 
use crate::runtime2::port::PortKind;
 
use super::global_store::ConnectorId;
 
use super::inbox::{
 
    PrivateInbox, PublicInbox, OutgoingMessage, DataMessage, SyncMessage,
 
    PrivateInbox, PublicInbox, OutgoingDataMessage, DataMessage, SyncMessage,
 
    SyncBranchConstraint, SyncConnectorSolution
 
};
 
use super::port::PortIdLocal;
 
@@ -340,8 +341,146 @@ impl ConnectorPDL {
 
        return self.in_sync;
 
    }
 

	
 
    pub fn insert_sync_message(&mut self, message: SyncMessage, results: &mut RunDeltaState) {
 
    /// Accepts a synchronous message and combines it with the locally stored
 
    /// solution(s).
 
    pub fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, results: &mut RunDeltaState) {
 
        debug_assert!(!message.to_visit.contains(&self.id)); // own ID already removed
 
        debug_assert!(message.constraints.iter().any(|v| v.connector_id == self.id)); // we have constraints
 

	
 
        // TODO: Optimize, use some kind of temp workspace vector
 
        let mut execution_path_branch_ids = Vec::new();
 

	
 
        if self.sync_finished_last_handled != 0 {
 
            // We have some solutions to match against
 
            let constraints_index = message.constraints
 
                .iter()
 
                .position(|v| v.connector_id == self.id)
 
                .unwrap();
 
            let constraints = &message.constraints[constraints_index].constraints;
 
            debug_assert!(!constraints.is_empty());
 

	
 
            // Note that we only iterate over the solutions we've already
 
            // handled ourselves, not necessarily
 
            let mut branch_index = self.sync_finished.first;
 
            'branch_loop: loop {
 
                // Load solution branch
 
                let branch = &self.branches[branch_index as usize];
 
                execution_path_branch_ids.clear();
 
                self.branch_ids_of_execution_path(BranchId::new(branch_index), &mut execution_path_branch_ids);
 

	
 
                // Check if the branch matches all of the applied constraints
 
                for constraint in constraints {
 
                    match constraint {
 
                        SyncBranchConstraint::SilentPort(silent_port_id) => {
 
                            let port_index = self.ports.get_port_index(*silent_port_id);
 
                            if port_index.is_none() {
 
                                // Nefarious peer
 
                                continue 'branch_loop;
 
                            }
 
                            let port_index = port_index.unwrap();
 

	
 
                            let mapping = self.ports.get_port(branch_index, port_index);
 
                            debug_assert!(mapping.is_assigned);
 

	
 
                            if mapping.num_times_fired != 0 {
 
                                // Not silent, constraint not satisfied
 
                                continue 'branch_loop;
 
                            }
 
                        },
 
                        SyncBranchConstraint::BranchNumber(expected_branch_id) => {
 
                            if !execution_path_branch_ids.contains(expected_branch_id) {
 
                                // Not the expected execution path, constraint not satisfied
 
                                continue 'branch_loop;
 
                            }
 
                        },
 
                        SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => {
 
                            let port_index = self.ports.get_port_index(*port_id);
 
                            if port_index.is_none() {
 
                                // Nefarious peer
 
                                continue 'branch_loop;
 
                            }
 
                            let port_index = port_index.unwrap();
 

	
 
                            let mapping = self.ports.get_port(branch_index, port_index);
 
                            if mapping.last_registered_branch_id != expected_branch_id {
 
                                // Not the expected interaction on this port, constraint not satisfied
 
                                continue 'branch_loop;
 
                            }
 
                        },
 
                    }
 
                }
 

	
 
                // If here, then all of the external constraints were satisfied
 
                // for the current branch. But the branch itself also imposes
 
                // constraints. So while building up the new solution, make sure
 
                // that those are satisfied as well.
 
                // TODO: Code below can probably be merged with initial solution
 
                //  generation.
 

	
 
                // - clone old solution so we can add to it
 
                let mut new_solution = message.clone();
 

	
 
                // - determine the initial port mapping
 
                let num_ports = self.ports.num_ports();
 
                let mut new_solution_mapping = Vec::with_capacity(num_ports);
 
                for port_index in 0..self.ports.num_ports() {
 
                    let port_id = self.ports.get_port_id(port_index);
 
                    let mapping = self.ports.get_port(branch_index, port_index);
 
                    new_solution_mapping.push((port_id, mapping.last_registered_branch_id));
 
                }
 

	
 
                // - replace constraints with a local solution
 
                new_solution.constraints.remove(constraints_index);
 
                new_solution.local_solutions.push(SyncConnectorSolution{
 
                    connector_id: self.id,
 
                    terminating_branch_id: BranchId::new(branch_index),
 
                    execution_branch_ids: execution_path_branch_ids.clone(),
 
                    final_port_mapping: new_solution_mapping,
 
                });
 

	
 
                // - do a second pass on the ports to generate and add the
 
                //   constraints that should be applied to other connectors
 
                for port_index in 0..self.ports.num_ports() {
 
                    let port_id = self.ports.get_port_id(port_index);
 

	
 
                    let (peer_connector_id, peer_port_id, peer_is_getter) = {
 
                        let key = unsafe{ ConnectorKey::from_id(self.id) };
 
                        let port = global.ports.get(&key, port_id);
 
                        (port.peer_connector, port.peer_id, port.kind == PortKind::Putter)
 
                    };
 

	
 
                    let mapping = self.ports.get_port(branch_index, port_index);
 
                    let constraint = if mapping.num_times_fired == 0 {
 
                        SyncBranchConstraint::SilentPort(peer_port_id)
 
                    } else {
 
                        if peer_is_getter {
 
                            SyncBranchConstraint::PortMapping(peer_port_id, mapping.last_registered_branch_id)
 
                        } else {
 
                            SyncBranchConstraint::BranchNumber(mapping.last_registered_branch_id)
 
                        }
 
                    };
 

	
 
                    match new_solution.add_or_check_constraint(peer_connector_id, constraint) {
 
                        None => continue 'branch_loop,
 
                        Some(false) => continue 'branch_loop,
 
                        Some(true) => {},
 
                    }
 
                }
 

	
 
                // If here, then the newly generated solution is completely
 
                // compatible.
 
                Self::submit_sync_solution(new_solution, results);
 

	
 
                // Consider the next branch
 
                if branch_index == self.sync_finished_last_handled {
 
                    // At the end of the previously handled solutions
 
                    break;
 
                }
 

	
 
                debug_assert!(branch.next_branch_in_queue.is_some()); // because we cannot be at the end of the queue
 
                branch_index = branch.next_branch_in_queue.unwrap();
 
            }
 
        }
 
    }
 

	
 
    // TODO: Remove GlobalStore, is used to retrieve ports. Ports belong with
 
@@ -354,6 +493,7 @@ impl ConnectorPDL {
 
            // When in speculative mode we might have generated new sync
 
            // solutions, we need to turn them into proposed solutions here.
 
            if self.sync_finished_last_handled != self.sync_finished.last {
 
                // Retrieve first element in queue
 
                let mut next_id;
 
                if self.sync_finished_last_handled == 0 {
 
                    next_id = self.sync_finished.first;
 
@@ -363,7 +503,37 @@ impl ConnectorPDL {
 
                    next_id = last_handled.next_branch_in_queue.unwrap();
 
                }
 

	
 
                // Transform branch into proposed global solution
 
                loop {
 
                    let branch_id = BranchId::new(next_id);
 
                    let branch = &self.branches[next_id as usize];
 
                    let branch_next = branch.next_branch_in_queue;
 

	
 
                    // Turn local solution into a message and send it along
 
                    // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed
 
                    let connector_key = unsafe{ ConnectorKey::from_id(self.id) };
 
                    let solution_message = self.generate_initial_solution_for_branch(branch_id, &connector_key, global);
 
                    if let Some(valid_solution) = solution_message {
 
                        Self::submit_sync_solution(valid_solution, results);
 
                    } else {
 
                        // Branch is actually invalid, but we only just figured
 
                        // it out. We need to mark it as invalid to prevent
 
                        // future use
 
                        Self::remove_branch_from_queue(&mut self.branches, &mut self.sync_finished, branch_id);
 
                        if branch_id == self.sync_finished_last_handled {
 
                            self.sync_finished_last_handled = self.sync_finished.last;
 
                        }
 

	
 
                        let branch = &mut self.branches[next_id as usize];
 
                        branch.sync_state = SpeculativeState::Inconsistent;
 
                    }
 

	
 
                    match branch_next {
 
                        Some(id) => next_id = id,
 
                        None => break,
 
                    }
 
                }
 

	
 
                self.sync_finished_last_handled = next_id;
 
            }
 

	
 
            return scheduling;
 
@@ -537,7 +707,7 @@ impl ConnectorPDL {
 
                    // Put in run results for thread to pick up and transfer to
 
                    // the correct connector inbox.
 
                    port_mapping.mark_definitive(branch.index, 1);
 
                    let message = OutgoingMessage {
 
                    let message = OutgoingDataMessage {
 
                        sending_port: local_port_id,
 
                        sender_prev_branch_id: BranchId::new_invalid(),
 
                        sender_cur_branch_id: branch.index,
 
@@ -551,7 +721,7 @@ impl ConnectorPDL {
 
                    Self::release_ports_during_sync(&mut self.ports, branch, &results.ports);
 
                    results.ports.clear();
 

	
 
                    results.outbox.push(message);
 
                    results.outbox.push(OutgoingMessage::Data(message));
 
                    return ConnectorScheduling::Immediate
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
@@ -640,7 +810,6 @@ impl ConnectorPDL {
 
    // `next_branch_in_queue` and the `BranchQueue` objects. Essentially forming
 
    // linked lists inside of the vector of branches.
 

	
 
    #[inline]
 
    fn pop_branch(branches: &mut Vec<Branch>, queue: &mut BranchQueue) -> &mut Branch {
 
        debug_assert!(queue.first != 0);
 
        let branch = &mut branches[queue.first as usize];
 
@@ -656,7 +825,6 @@ impl ConnectorPDL {
 
        return branch;
 
    }
 

	
 
    #[inline]
 
    fn push_branch_into_queue(
 
        branches: &mut Vec<Branch>, queue: &mut BranchQueue, to_push: BranchId,
 
    ) {
 
@@ -677,6 +845,50 @@ impl ConnectorPDL {
 
        }
 
    }
 

	
 
    /// Removes branch from linked-list queue. Walks through the entire list to
 
    /// find the element (!). Assumption is that this is not called often.
 
    fn remove_branch_from_queue(
 
        branches: &mut Vec<Branch>, queue: &mut BranchQueue, to_delete: BranchId,
 
    ) {
 
        debug_assert!(to_delete.is_valid()); // we're deleting a valid item
 
        debug_assert!(queue.first != 0 && queue.last != 0); // queue isn't empty to begin with
 

	
 
        // Retrieve branch and its next element
 
        let branch_to_delete = &mut branches[to_delete.index as usize];
 
        let branch_next_index_option = branch_to_delete.next_branch_in_queue;
 
        let branch_next_index_unwrapped = branch_next_index_option.unwrap_or(0);
 
        branch_to_delete.next_branch_in_queue = None;
 

	
 
        // Walk through all elements in queue to find branch to delete
 
        let mut prev_index = 0;
 
        let mut next_index = queue.first;
 

	
 
        while next_index != 0 {
 
            if next_index == to_delete.index {
 
                // Found the element we're going to delete
 
                // - check if at the first element or not
 
                if prev_index == 0 {
 
                    queue.first = branch_next_index_unwrapped;
 
                } else {
 
                    let prev_branch = &mut branches[prev_index as usize];
 
                    prev_branch.next_branch_in_queue = branch_next_index_option;
 
                }
 

	
 
                // - check if at last element or not (also takes care of "no elements left in queue")
 
                if branch_next_index_option.is_none() {
 
                    queue.last = prev_index;
 
                }
 

	
 
                return;
 
            }
 

	
 
            prev_index = next_index;
 
        }
 

	
 
        // If here, then we didn't find the element
 
        panic!("branch does not exist in provided queue");
 
    }
 

	
 
    // Helpers for local port management. Specifically for adopting/losing
 
    // ownership over ports, and for checking if specific ports can be sent
 
    // over another port.
 
@@ -866,6 +1078,23 @@ impl ConnectorPDL {
 
        return Some(sync_message);
 
    }
 

	
 
    fn submit_sync_solution(partial_solution: SyncMessage, results: &mut RunDeltaState) {
 
        if partial_solution.to_visit.is_empty() {
 
            // Solution is completely consistent
 
            let mut full_solution = SolutionMessage{
 
                local_solutions: Vec::with_capacity(partial_solution.local_solutions.len()),
 
            };
 
            for local_solution in &partial_solution.local_solutions {
 
                full_solution.local_solutions.push((local_solution.connector_id, local_solution.terminating_branch_id));
 
            }
 

	
 
            results.outbox.push(OutgoingMessage::Solution(full_solution));
 
        } else {
 
            // Still have connectors to visit
 
            results.outbox.push(OutgoingMessage::Sync(partial_solution));
 
        }
 
    }
 

	
 
    fn branch_ids_of_execution_path(&self, leaf_branch_id: BranchId, parents: &mut Vec<BranchId>) {
 
        debug_assert!(parents.is_empty());
 

	
src/runtime2/inbox.rs
Show inline comments
 
@@ -23,13 +23,19 @@ use super::global_store::ConnectorId;
 
/// A message prepared by a connector. Waiting to be picked up by the runtime to
 
/// be sent to another connector.
 
#[derive(Clone)]
 
pub struct OutgoingMessage {
 
pub struct OutgoingDataMessage {
 
    pub sending_port: PortIdLocal,
 
    pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
 
    pub sender_cur_branch_id: BranchId, // always valid
 
    pub message: ValueGroup,
 
}
 

	
 
pub enum OutgoingMessage {
 
    Data(OutgoingDataMessage),
 
    Sync(SyncMessage),
 
    Solution(SolutionMessage),
 
}
 

	
 
/// A message that has been delivered (after being imbued with the receiving
 
/// port by the scheduler) to a connector.
 
#[derive(Clone)]
 
@@ -42,12 +48,14 @@ pub struct DataMessage {
 
    pub message: ValueGroup,
 
}
 

	
 
#[derive(Clone)]
 
pub enum SyncBranchConstraint {
 
    SilentPort(PortIdLocal),
 
    BranchNumber(BranchId),
 
    PortMapping(PortIdLocal, BranchId),
 
}
 

	
 
#[derive(Clone)]
 
pub struct SyncConnectorSolution {
 
    pub connector_id: ConnectorId,
 
    pub terminating_branch_id: BranchId,
 
@@ -55,11 +63,13 @@ pub struct SyncConnectorSolution {
 
    pub final_port_mapping: Vec<(PortIdLocal, BranchId)>
 
}
 

	
 
#[derive(Clone)]
 
pub struct SyncConnectorConstraints {
 
    pub connector_id: ConnectorId,
 
    pub constraints: Vec<SyncBranchConstraint>,
 
}
 

	
 
#[derive(Clone)]
 
pub struct SyncMessage {
 
    pub local_solutions: Vec<SyncConnectorSolution>,
 
    pub constraints: Vec<SyncConnectorConstraints>,
 
@@ -103,7 +113,7 @@ impl SyncMessage {
 
        if self.has_local_solution_for(connector_id) {
 
            return self.check_constraint(connector_id, constraint);
 
        } else {
 
            self.add_constraint(connector_id);
 
            self.add_constraint(connector_id, constraint);
 
            return Ok(true);
 
        }
 
    }
 
@@ -175,6 +185,10 @@ impl SyncMessage {
 
    }
 
}
 

	
 
pub struct SolutionMessage {
 
    pub local_solutions: Vec<(ConnectorId, BranchId)>,
 
}
 

	
 
/// A control message. These might be sent by the scheduler to notify eachother
 
/// of asynchronous state changes.
 
pub struct ControlMessage {
 
@@ -194,6 +208,7 @@ pub enum ControlMessageVariant {
 
pub enum Message {
 
    Data(DataMessage),          // data message, handled by connector
 
    Sync(SyncMessage),          // sync message, handled by both connector/scheduler
 
    Solution(SolutionMessage),  // solution message, finishing a sync round
 
    Control(ControlMessage),    // control message, handled by scheduler
 
    Ping,                       // ping message, intentionally waking up a connector (used for native connectors)
 
}
src/runtime2/scheduler.rs
Show inline comments
 
@@ -61,6 +61,12 @@ impl Scheduler {
 
                            } else {
 
                                scheduled.connector.inbox.insert_message(message);
 
                            }
 
                        },
 
                        Message::Sync(message) => {
 
                            scheduled.connector
 
                        },
 
                        Message::Solution(solution) => {
 

	
 
                        },
 
                        Message::Control(message) => {
 
                            match message.content {
0 comments (0 inline, 0 general)