Changeset - b4ac681e0e7f
[Not reviewed]
0 6 0
mh - 4 years ago 2021-10-18 12:59:40
contact@maxhenger.nl
WIP on message-based sync impl
6 files changed with 515 insertions and 405 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -5,9 +5,11 @@ use std::sync::atomic::AtomicBool;
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::global_store::{ConnectorKey, GlobalStore};
 
use crate::runtime2::inbox::{OutgoingMessage, SolutionMessage};
 
use crate::runtime2::global_store::ConnectorKey;
 
use crate::runtime2::inbox::{MessageContents, OutgoingMessage, SolutionMessage};
 
use crate::runtime2::native::Connector;
 
use crate::runtime2::port::PortKind;
 
use crate::runtime2::scheduler::ConnectorCtx;
 
use super::global_store::ConnectorId;
 
use super::inbox::{
 
    PrivateInbox, PublicInbox, OutgoingDataMessage, DataMessage, SyncMessage,
 
@@ -97,6 +99,15 @@ impl Branch {
 
            ports_delta: parent_branch.ports_delta.clone(),
 
        }
 
    }
 

	
 
    fn commit_to_sync(&mut self) {
 
        self.index = BranchId::new(0);
 
        self.parent_index = BranchId::new_invalid();
 
        self.sync_state = SpeculativeState::RunningNonSync;
 
        self.next_branch_in_queue = None;
 
        self.received.clear();
 
        self.ports_delta.clear();
 
    }
 
}
 

	
 
#[derive(Clone)]
 
@@ -141,8 +152,7 @@ enum PortOwnershipError {
 
    AlreadyGivenAway(PortIdLocal)
 
}
 

	
 
/// As the name implies, this contains a description of the ports associated
 
/// with a connector.
 
/// Contains a description of the port mapping during a particular sync session.
 
/// TODO: Extend documentation
 
pub(crate) struct ConnectorPorts {
 
    // Essentially a mapping from `port_index` to `port_id`.
 
@@ -184,6 +194,24 @@ impl ConnectorPorts {
 
        }
 
    }
 

	
 
    /// Adds a new port. Caller must make sure that the connector is not in the
 
    /// sync phase.
 
    fn add_port(&mut self, port_id: PortIdLocal) {
 
        debug_assert!(self.port_mapping.len() == self.owned_ports.len());
 
        debug_assert!(!self.owned_ports.contains(&port_id));
 
        self.owned_ports.push(port_id);
 
        self.port_mapping.push(PortAssignment::new_unassigned());
 
    }
 

	
 
    /// Commits to a particular branch. Essentially just removes the port
 
    /// mapping information generated during the sync phase.
 
    fn commit_to_sync(&mut self) {
 
        self.port_mapping.truncate(self.owned_ports.len());
 
        debug_assert!(self.port_mapping.iter().all(|v| {
 
            !v.is_assigned && !v.last_registered_branch_id.is_valid()
 
        }));
 
    }
 

	
 
    /// Removes a particular port from the connector. May only be done if the
 
    /// connector is in non-sync mode
 
    fn remove_port(&mut self, port_id: PortIdLocal) {
 
@@ -251,14 +279,22 @@ struct BranchQueue {
 
}
 

	
 
impl BranchQueue {
 
    #[inline]
 
    fn new() -> Self {
 
        Self{ first: 0, last: 0 }
 
    }
 

	
 
    #[inline]
 
    fn is_empty(&self) -> bool {
 
        debug_assert!((self.first == 0) == (self.last == 0));
 
        return self.first == 0;
 
    }
 

	
 
    #[inline]
 
    fn clear(&mut self) {
 
        self.first = 0;
 
        self.last = 0;
 
    }
 
}
 

	
 
/// Public fields of the connector that can be freely shared between multiple
 
@@ -281,7 +317,6 @@ impl ConnectorPublic {
 
// TODO: Do this outside of the connector, create a wrapping struct
 
pub(crate) struct ConnectorPDL {
 
    // State and properties of connector itself
 
    id: ConnectorId,
 
    in_sync: bool,
 
    // Branch management
 
    branches: Vec<Branch>, // first branch is always non-speculative one
 
@@ -289,7 +324,9 @@ pub(crate) struct ConnectorPDL {
 
    sync_pending_get: BranchQueue,
 
    sync_finished: BranchQueue,
 
    sync_finished_last_handled: u32,
 
    cur_round: u32,
 
    // Port/message management
 
    pub committed_to: Option<(ConnectorId, u64)>,
 
    pub inbox: PrivateInbox,
 
    pub ports: ConnectorPorts,
 
}
 
@@ -313,43 +350,112 @@ impl RunContext for TempCtx {
 
    }
 
}
 

	
 
impl Connector for ConnectorPDL {
 
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        use MessageContents as MC;
 

	
 
        match message {
 
            MC::Data(message) => self.handle_data_message(message),
 
            MC::Sync(message) => self.handle_sync_message(message, ctx, delta_state),
 
            MC::RequestCommit(message) => self.handle_request_commit_message(message, ctx, delta_state),
 
            MC::ConfirmCommit(message) => self.handle_confirm_commit_message(message, ctx, delta_state),
 
            MC::Control(_) | MC::Ping => {},
 
        }
 
    }
 

	
 
    fn run(&mut self, pd: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        if self.in_sync {
 
            let scheduling = self.run_in_speculative_mode(pd, ctx, results);
 

	
 
            // When in speculative mode we might have generated new sync
 
            // solutions, we need to turn them into proposed solutions here.
 
            if self.sync_finished_last_handled != self.sync_finished.last {
 
                // Retrieve first element in queue
 
                let mut next_id;
 
                if self.sync_finished_last_handled == 0 {
 
                    next_id = self.sync_finished.first;
 
                } else {
 
                    let last_handled = &self.branches[self.sync_finished_last_handled as usize];
 
                    debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue"
 
                    next_id = last_handled.next_branch_in_queue.unwrap();
 
                }
 

	
 
                loop {
 
                    let branch_id = BranchId::new(next_id);
 
                    let branch = &self.branches[next_id as usize];
 
                    let branch_next = branch.next_branch_in_queue;
 

	
 
                    // Turn local solution into a message and send it along
 
                    // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed
 
                    let solution_message = self.generate_initial_solution_for_branch(branch_id, ctx);
 
                    if let Some(valid_solution) = solution_message {
 
                        self.submit_sync_solution(valid_solution, ctx, results);
 
                    } else {
 
                        // Branch is actually invalid, but we only just figured
 
                        // it out. We need to mark it as invalid to prevent
 
                        // future use
 
                        Self::remove_branch_from_queue(&mut self.branches, &mut self.sync_finished, branch_id);
 
                        if branch_id == self.sync_finished_last_handled {
 
                            self.sync_finished_last_handled = self.sync_finished.last;
 
                        }
 

	
 
                        let branch = &mut self.branches[next_id as usize];
 
                        branch.sync_state = SpeculativeState::Inconsistent;
 
                    }
 

	
 
                    match branch_next {
 
                        Some(id) => next_id = id,
 
                        None => break,
 
                    }
 
                }
 

	
 
                self.sync_finished_last_handled = next_id;
 
            }
 

	
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(pd, ctx, results);
 
            return scheduling;
 
        }
 
    }
 
}
 

	
 
impl ConnectorPDL {
 
    /// Constructs a representation of a connector. The assumption is that the
 
    /// initial branch is at the first instruction of the connector's code,
 
    /// hence is in a non-sync state. Note that the initial ID is invalid, we
 
    /// assume the connector will get inserted into the runtime, there it will
 
    /// receive its ID.
 
    /// hence is in a non-sync state.
 
    pub fn new(initial_branch: Branch, owned_ports: Vec<PortIdLocal>) -> Self {
 
        Self{
 
            id: ConnectorId::new_invalid(),
 
            in_sync: false,
 
            branches: vec![initial_branch],
 
            sync_active: BranchQueue::new(),
 
            sync_pending_get: BranchQueue::new(),
 
            sync_finished: BranchQueue::new(),
 
            sync_finished_last_handled: 0, // none at all
 
            cur_round: 0,
 
            committed_to: None,
 
            inbox: PrivateInbox::new(),
 
            ports: ConnectorPorts::new(owned_ports),
 
        }
 
    }
 

	
 
    pub(crate) fn set_connector_id(&mut self, id: ConnectorId) {
 
        debug_assert!(!self.id.is_valid()); // ID should only be set once
 
        self.id = id;
 
    }
 

	
 
    pub fn is_in_sync_mode(&self) -> bool {
 
        return self.in_sync;
 
    }
 

	
 
    pub fn insert_data_message(&mut self, message: DataMessage) {
 
    // -------------------------------------------------------------------------
 
    // Handling connector messages
 
    // -------------------------------------------------------------------------
 

	
 
    #[inline]
 
    pub fn handle_data_message(&mut self, message: DataMessage) {
 
        self.inbox.insert_message(message);
 
    }
 

	
 
    /// Accepts a synchronous message and combines it with the locally stored
 
    /// solution(s).
 
    pub fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, results: &mut RunDeltaState) {
 
        debug_assert!(!message.to_visit.contains(&self.id)); // own ID already removed
 
    /// solution(s). Then queue new `Sync`/`Solution` messages when appropriate.
 
    pub fn handle_sync_message(&mut self, message: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) {
 
        debug_assert!(!message.to_visit.contains(&ctx.id)); // own ID already removed
 
        debug_assert!(message.constraints.iter().any(|v| v.connector_id == self.id)); // we have constraints
 

	
 
        // TODO: Optimize, use some kind of temp workspace vector
 
@@ -359,7 +465,7 @@ impl ConnectorPDL {
 
            // We have some solutions to match against
 
            let constraints_index = message.constraints
 
                .iter()
 
                .position(|v| v.connector_id == self.id)
 
                .position(|v| v.connector_id == ctx.id)
 
                .unwrap();
 
            let constraints = &message.constraints[constraints_index].constraints;
 
            debug_assert!(!constraints.is_empty());
 
@@ -437,7 +543,7 @@ impl ConnectorPDL {
 
                // - replace constraints with a local solution
 
                new_solution.constraints.remove(constraints_index);
 
                new_solution.local_solutions.push(SyncConnectorSolution{
 
                    connector_id: self.id,
 
                    connector_id: ctx.id,
 
                    terminating_branch_id: BranchId::new(branch_index),
 
                    execution_branch_ids: execution_path_branch_ids.clone(),
 
                    final_port_mapping: new_solution_mapping,
 
@@ -449,8 +555,7 @@ impl ConnectorPDL {
 
                    let port_id = self.ports.get_port_id(port_index);
 

	
 
                    let (peer_connector_id, peer_port_id, peer_is_getter) = {
 
                        let key = unsafe{ ConnectorKey::from_id(self.id) };
 
                        let port = global.ports.get(&key, port_id);
 
                        let port = ctx.get_port(port_id);
 
                        (port.peer_connector, port.peer_id, port.kind == PortKind::Putter)
 
                    };
 

	
 
@@ -474,7 +579,7 @@ impl ConnectorPDL {
 

	
 
                // If here, then the newly generated solution is completely
 
                // compatible.
 
                Self::submit_sync_solution(new_solution, results);
 
                self.submit_sync_solution(new_solution, ctx, results);
 

	
 
                // Consider the next branch
 
                if branch_index == self.sync_finished_last_handled {
 
@@ -488,76 +593,103 @@ impl ConnectorPDL {
 
        }
 
    }
 

	
 
    // TODO: Remove GlobalStore, is used to retrieve ports. Ports belong with
 
    //  the connector itself, half managed, half accessible (a-la PublicInbox
 
    //  and PrivateInbox)
 
    pub fn run(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        if self.in_sync {
 
            let scheduling = self.run_in_speculative_mode(pd, global, results);
 

	
 
            // When in speculative mode we might have generated new sync
 
            // solutions, we need to turn them into proposed solutions here.
 
            if self.sync_finished_last_handled != self.sync_finished.last {
 
                // Retrieve first element in queue
 
                let mut next_id;
 
                if self.sync_finished_last_handled == 0 {
 
                    next_id = self.sync_finished.first;
 
                } else {
 
                    let last_handled = &self.branches[self.sync_finished_last_handled as usize];
 
                    debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue"
 
                    next_id = last_handled.next_branch_in_queue.unwrap();
 
                }
 

	
 
                loop {
 
                    let branch_id = BranchId::new(next_id);
 
                    let branch = &self.branches[next_id as usize];
 
                    let branch_next = branch.next_branch_in_queue;
 

	
 
                    // Turn local solution into a message and send it along
 
                    // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed
 
                    let connector_key = unsafe{ ConnectorKey::from_id(self.id) };
 
                    let solution_message = self.generate_initial_solution_for_branch(branch_id, &connector_key, global);
 
                    if let Some(valid_solution) = solution_message {
 
                        Self::submit_sync_solution(valid_solution, results);
 
                    } else {
 
                        // Branch is actually invalid, but we only just figured
 
                        // it out. We need to mark it as invalid to prevent
 
                        // future use
 
                        Self::remove_branch_from_queue(&mut self.branches, &mut self.sync_finished, branch_id);
 
                        if branch_id == self.sync_finished_last_handled {
 
                            self.sync_finished_last_handled = self.sync_finished.last;
 
                        }
 
    fn handle_request_commit_message(&mut self, mut message: SolutionMessage, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        let should_propagate_message = match &self.committed_to {
 
            Some((previous_origin, previous_comparison)) => {
 
                // Already committed to something. So will commit to this if it
 
                // takes precedence over the current solution
 
                message.comparison_number > *previous_comparison ||
 
                    (message.comparison_number == *previous_comparison && message.connector_origin.0 > previous_comparison.0)
 
            },
 
            None => {
 
                // Not yet committed to a solution, so commit to this one
 
                true
 
            }
 
        };
 

	
 
                        let branch = &mut self.branches[next_id as usize];
 
                        branch.sync_state = SpeculativeState::Inconsistent;
 
                    }
 

	
 
                    match branch_next {
 
                        Some(id) => next_id = id,
 
                        None => break,
 
        if should_propagate_message {
 
            self.committed_to = Some((message.connector_origin, message.comparison_number));
 

	
 
            if message.to_visit.is_empty() {
 
                // Visited all of the connectors, so every connector can now
 
                // apply the solution
 
                // TODO: Use temporary workspace
 
                let mut to_visit = Vec::with_capacity(message.local_solutions.len() - 1);
 
                for (connector_id, _) in &message.local_solutions {
 
                    if *connector_id != ctx.id {
 
                        to_visit.push(*connector_id);
 
                    }
 
                }
 

	
 
                self.sync_finished_last_handled = next_id;
 
                message.to_visit = to_visit;
 
                self.handle_confirm_commit_message(message.clone(), ctx, delta_state);
 
                delta_state.outbox.push(MessageContents::ConfirmCommit(message));
 
            } else {
 
                // Not yet visited all of the connectors
 
                delta_state.outbox.push(MessageContents::RequestCommit(message));
 
            }
 
        }
 
    }
 

	
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(pd, global, results);
 
            return scheduling;
 
    fn handle_confirm_commit_message(&mut self, message: SolutionMessage, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        // Make sure this is the message we actually committed to. As long as
 
        // we're running on a single machine this is fine.
 
        // TODO: Take care of nefarious peers
 
        let (expected_connector_id, expected_comparison_number) =
 
            self.committed_to.unwrap();
 
        assert_eq!(message.connector_origin, expected_connector_id);
 
        assert_eq!(message.comparison_number, expected_comparison_number);
 

	
 
        // Find the branch we're supposed to commit to
 
        let (_, branch_id) = message.local_solutions
 
            .iter()
 
            .find(|(id, _)| *id == ctx.id)
 
            .unwrap();
 
        let branch_id = *branch_id;
 

	
 
        // Commit to the branch. That is: move the solution branch to the first
 
        // of the connector's branches
 
        self.in_sync = false;
 
        self.branches.swap(0, branch_id.index as usize);
 
        self.branches.truncate(1); // TODO: Or drain and do not deallocate?
 
        let solution = &mut self.branches[0];
 

	
 
        // Clear all of the other sync-related variables
 
        self.sync_active.clear();
 
        self.sync_pending_get.clear();
 
        self.sync_finished.clear();
 
        self.sync_finished_last_handled = 0;
 
        self.cur_round += 1;
 

	
 
        self.committed_to = None;
 
        self.inbox.clear();
 
        self.ports.commit_to_sync();
 

	
 
        // Add/remove any of the ports we lost during the sync phase
 
        for port_delta in &solution.ports_delta {
 
            if port_delta.acquired {
 
                self.ports.add_port(port_delta.port_id);
 
            } else {
 
                self.ports.remove_port(port_delta.port_id);
 
            }
 
        }
 
        solution.commit_to_sync();
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Executing connector code
 
    // -------------------------------------------------------------------------
 

	
 
    /// Runs the connector in synchronous mode. Potential changes to the global
 
    /// system's state are added to the `RunDeltaState` object by the connector,
 
    /// where it is the caller's responsibility to immediately take care of
 
    /// those changes. The return value indicates when (and if) the connector
 
    /// needs to be scheduled again.
 
    pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(self.in_sync);
 
        debug_assert!(!self.sync_active.is_empty());
 

	
 
        let branch = Self::pop_branch(&mut self.branches, &mut self.sync_active);
 
        let branch = Self::pop_branch_from_queue(&mut self.branches, &mut self.sync_active);
 

	
 
        // Run the branch to the next blocking point
 
        let mut run_context = TempCtx{};
 
@@ -745,7 +877,7 @@ impl ConnectorPDL {
 
    }
 

	
 
    /// Runs the connector in non-synchronous mode.
 
    pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(!self.in_sync);
 
        debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty());
 
        debug_assert!(self.branches.len() == 1);
 
@@ -815,7 +947,8 @@ impl ConnectorPDL {
 
    // `next_branch_in_queue` and the `BranchQueue` objects. Essentially forming
 
    // linked lists inside of the vector of branches.
 

	
 
    fn pop_branch(branches: &mut Vec<Branch>, queue: &mut BranchQueue) -> &mut Branch {
 
    /// Pops from front of linked-list branch queue.
 
    fn pop_branch_from_queue(branches: &mut Vec<Branch>, queue: &mut BranchQueue) -> &mut Branch {
 
        debug_assert!(queue.first != 0);
 
        let branch = &mut branches[queue.first as usize];
 
        *queue.first = branch.next_branch_in_queue.unwrap_or(0);
 
@@ -830,6 +963,7 @@ impl ConnectorPDL {
 
        return branch;
 
    }
 

	
 
    /// Pushes branch at the end of the linked-list branch queue.
 
    fn push_branch_into_queue(
 
        branches: &mut Vec<Branch>, queue: &mut BranchQueue, to_push: BranchId,
 
    ) {
 
@@ -1000,7 +1134,7 @@ impl ConnectorPDL {
 
    /// Generates the initial solution for a finished sync branch. If initial
 
    /// local solution is valid, then the appropriate message is returned.
 
    /// Otherwise the initial solution is inconsistent.
 
    fn generate_initial_solution_for_branch(&self, branch_id: BranchId, key: &ConnectorKey, global: &GlobalStore) -> Option<SyncMessage> {
 
    fn generate_initial_solution_for_branch(&self, branch_id: BranchId, ctx: &ConnectorCtx) -> Option<SyncMessage> {
 
        // Retrieve branchg
 
        debug_assert!(branch_id.is_valid()); // because we're supposed to be in sync mode
 
        let branch = &self.branches[branch_id.index as usize];
 
@@ -1025,7 +1159,7 @@ impl ConnectorPDL {
 
        }
 

	
 
        let initial_local_solution = SyncConnectorSolution{
 
            connector_id: self.id,
 
            connector_id: ctx.id,
 
            terminating_branch_id: branch_id,
 
            execution_branch_ids: all_branch_ids,
 
            final_port_mapping: initial_solution_port_mapping,
 
@@ -1041,17 +1175,13 @@ impl ConnectorPDL {
 
            // sender and one for the receiver, ensuring it was not used.
 
            // TODO: This will fail if a port is passed around multiple times.
 
            //  maybe a special "passed along" entry in `ports_delta`.
 
            if !sync_message.check_constraint(self.id, SyncBranchConstraint::SilentPort(port_delta.port_id)) {
 
            if !sync_message.check_constraint(ctx.id, SyncBranchConstraint::SilentPort(port_delta.port_id)) {
 
                return None;
 
            }
 

	
 
            // Might need to check if we own the other side of the channel
 
            let (peer_port_id, peer_connector_id) = {
 
                let port = global.ports.get(key, port_delta.port_id);
 
                (port.peer_id, port.peer_connector)
 
            };
 

	
 
            if !sync_message.add_or_check_constraint(peer_connector, SyncBranchConstraint::SilentPort(peer_port_id)).unwrap() {
 
            let port = ctx.get_port(port_delta.port_id);
 
            if !sync_message.add_or_check_constraint(port.peer_connector, SyncBranchConstraint::SilentPort(port.peer_id)).unwrap() {
 
                return None;
 
            }
 
        }
 
@@ -1059,20 +1189,17 @@ impl ConnectorPDL {
 
        // - constraints on other components due to owned ports
 
        for port_index in 0..self.ports.num_ports() {
 
            let port_id = self.ports.get_port_id(port_index);
 
            let port = self.ports.get_port(branch_id.index, port_index);
 
            let (peer_port_id, peer_connector_id, is_getter) = {
 
                let port = global.ports.get(key, port_id);
 
                (port.peer_id, port.peer_connector, port.kind == PortKind::Getter)
 
            };
 
            let port_mapping = self.ports.get_port(branch_id.index, port_index);
 
            let port = ctx.get_port(port_id);
 

	
 
            let constraint = if port.is_assigned {
 
                if is_getter {
 
                    SyncBranchConstraint::BranchNumber(port.last_registered_branch_id)
 
            let constraint = if port_mapping.is_assigned {
 
                if port.kind == PortKind::Getter {
 
                    SyncBranchConstraint::BranchNumber(port_mapping.last_registered_branch_id)
 
                } else {
 
                    SyncBranchConstraint::PortMapping(peer_port_id, port.last_registered_branch_id)
 
                    SyncBranchConstraint::PortMapping(port.peer_id, port_mapping.last_registered_branch_id)
 
                }
 
            } else {
 
                SyncBranchConstraint::SilentPort(peer_port_id)
 
                SyncBranchConstraint::SilentPort(port.peer_id)
 
            };
 

	
 
            if !sync_message.add_or_check_constraint(peer_connector_id, constraint).unwrap() {
 
@@ -1083,20 +1210,38 @@ impl ConnectorPDL {
 
        return Some(sync_message);
 
    }
 

	
 
    fn submit_sync_solution(partial_solution: SyncMessage, results: &mut RunDeltaState) {
 
    fn submit_sync_solution(&mut self, partial_solution: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) {
 
        if partial_solution.to_visit.is_empty() {
 
            // Solution is completely consistent
 
            // Solution is completely consistent. So ask everyone to commit
 
            // TODO: Maybe another package for random?
 
            let comparison_number: u64 = unsafe {
 
                let mut random_array = [0u8; 8];
 
                getrandom::getrandom(&mut random_array);
 
                std::mem::transmute(random_array)
 
            };
 

	
 
            let num_local = partial_solution.local_solutions.len();
 

	
 
            let mut full_solution = SolutionMessage{
 
                local_solutions: Vec::with_capacity(partial_solution.local_solutions.len()),
 
                comparison_number,
 
                connector_origin: ctx.id,
 
                local_solutions: Vec::with_capacity(num_local),
 
                to_visit: Vec::with_capacity(num_local - 1),
 
            };
 

	
 
            for local_solution in &partial_solution.local_solutions {
 
                full_solution.local_solutions.push((local_solution.connector_id, local_solution.terminating_branch_id));
 
                if local_solution.connector_id != ctx.id {
 
                    full_solution.to_visit.push(local_solution.connector_id);
 
                }
 
            }
 

	
 
            results.outbox.push(OutgoingMessage::Solution(full_solution));
 
            debug_assert!(self.committed_to.is_none());
 
            self.committed_to = Some((full_solution.connector_origin, full_solution.comparison_number));
 
            results.outbox.push(MessageContents::RequestCommit(full_solution));
 
        } else {
 
            // Still have connectors to visit
 
            results.outbox.push(OutgoingMessage::Sync(partial_solution));
 
            results.outbox.push(MessageContents::Sync(partial_solution));
 
        }
 
    }
 

	
 
@@ -1121,7 +1266,7 @@ impl ConnectorPDL {
 
pub(crate) struct RunDeltaState {
 
    // Variables that allow the thread running the connector to pick up global
 
    // state changes and try to apply them.
 
    pub outbox: Vec<OutgoingMessage>,
 
    pub outbox: Vec<MessageContents>,
 
    pub new_connectors: Vec<ConnectorPDL>,
 
    // Workspaces
 
    pub ports: Vec<PortIdLocal>,
 
@@ -1145,7 +1290,6 @@ pub(crate) enum ConnectorScheduling {
 
    NotNow,         // Do not reschedule for running
 
}
 

	
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortIdLocal>) {
src/runtime2/global_store.rs
Show inline comments
 
use std::ptr;
 
use std::sync::{Arc, Barrier, RwLock, RwLockReadGuard};
 
use std::sync::atomic::{AtomicBool, AtomicU32};
 

	
 
use crate::collections::{MpmcQueue, RawVec};
 

	
 
use super::connector::{ConnectorPDL, ConnectorPublic};
 
@@ -5,13 +9,11 @@ use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel};
 
use super::inbox::PublicInbox;
 
use super::scheduler::Router;
 

	
 
use std::ptr;
 
use std::sync::{Barrier, RwLock, RwLockReadGuard};
 
use std::sync::atomic::AtomicBool;
 
use crate::ProtocolDescription;
 
use crate::runtime2::connector::{ConnectorScheduling, RunDeltaState};
 
use crate::runtime2::inbox::{DataMessage, SyncMessage};
 
use crate::runtime2::inbox::{DataMessage, MessageContents, SyncMessage};
 
use crate::runtime2::native::Connector;
 
use crate::runtime2::scheduler::ConnectorCtx;
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
@@ -39,7 +41,7 @@ impl ConnectorKey {
 
/// A kind of token that allows shared access to a connector. Multiple threads
 
/// may hold this
 
#[derive(Copy, Clone)]
 
pub(crate) struct ConnectorId(u32);
 
pub(crate) struct ConnectorId(pub u32);
 

	
 
impl ConnectorId {
 
    // TODO: Like the other `new_invalid`, maybe remove
 
@@ -64,31 +66,25 @@ pub enum ConnectorVariant {
 
}
 

	
 
impl Connector for ConnectorVariant {
 
    fn insert_data_message(&mut self, message: DataMessage) {
 
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.insert_data_message(message),
 
            ConnectorVariant::Native(c) => c.insert_data_message(message),
 
            ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state),
 
            ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state),
 
        }
 
    }
 

	
 
    fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState) {
 
    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.insert_sync_message(message, global, delta_state),
 
            ConnectorVariant::Native(c) => c.insert_sync_message(message, global, delta_state),
 
        }
 
    }
 

	
 
    fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.run(protocol_description, global, delta_state),
 
            ConnectorVariant::Native(c) => c.run(protocol_description, global, delta_state),
 
            ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state),
 
            ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state),
 
        }
 
    }
 
}
 

	
 
pub struct ScheduledConnector {
 
    pub connector: ConnectorVariant,
 
    pub public: ConnectorPublic,
 
    pub connector: ConnectorVariant, // access by connector
 
    pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector
 
    pub public: ConnectorPublic, // accessible by all schedulers and connectors
 
    pub router: Router,
 
}
 

	
 
@@ -97,7 +93,8 @@ pub struct ScheduledConnector {
 
/// Otherwise one has shared access.
 
///
 
/// This datastructure is built to be wrapped in a RwLock.
 
struct ConnectorStore {
 
pub(crate) struct ConnectorStore {
 
    pub(crate) port_counter: Arc<AtomicU32>,
 
    inner: RwLock<ConnectorStoreInner>,
 
}
 

	
 
@@ -109,6 +106,7 @@ struct ConnectorStoreInner {
 
impl ConnectorStore {
 
    fn with_capacity(capacity: usize) -> Self {
 
        return Self{
 
            port_counter: Arc::new(AtomicU32::new(0)),
 
            inner: RwLock::new(ConnectorStoreInner {
 
                connectors: RawVec::with_capacity(capacity),
 
                free: Vec::with_capacity(capacity),
 
@@ -141,38 +139,53 @@ impl ConnectorStore {
 

	
 
    /// Create a new connector, returning the key that can be used to retrieve
 
    /// and/or queue it.
 
    pub(crate) fn create(&self, connector: ConnectorVariant) -> ConnectorKey {
 
        let lock = self.inner.write().unwrap();
 
        let connector = ScheduledConnector{
 
            connector,
 
            public: ConnectorPublic::new(),
 
            router: Router::new(),
 
        };
 
    pub(crate) fn create(&self, created_by: &mut ScheduledConnector, connector: ConnectorVariant) -> ConnectorKey {
 
        // Creation of the connector in the global store, requires a lock
 
        {
 
            let lock = self.inner.write().unwrap();
 
            let connector = ScheduledConnector {
 
                connector,
 
                context: ConnectorCtx::new(self.port_counter.clone()),
 
                public: ConnectorPublic::new(),
 
                router: Router::new(),
 
            };
 

	
 
        let index;
 
        if lock.free.is_empty() {
 
            let connector = Box::into_raw(Box::new(connector));
 
            let index;
 
            if lock.free.is_empty() {
 
                let connector = Box::into_raw(Box::new(connector));
 

	
 
            unsafe {
 
                // Cheating a bit here. Anyway, move to heap, store in list
 
                index = lock.connectors.len();
 
                lock.connectors.push(connector);
 
            }
 
        } else {
 
            index = lock.free.pop().unwrap();
 
                unsafe {
 
                    // Cheating a bit here. Anyway, move to heap, store in list
 
                    index = lock.connectors.len();
 
                    lock.connectors.push(connector);
 
                }
 
            } else {
 
                index = lock.free.pop().unwrap();
 

	
 
            unsafe {
 
                let target = lock.connectors.get_mut(index);
 
                debug_assert!(!target.is_null());
 
                ptr::write(*target, connector);
 
                unsafe {
 
                    let target = lock.connectors.get_mut(index);
 
                    debug_assert!(!target.is_null());
 
                    ptr::write(*target, connector);
 
                }
 
            }
 
        }
 

	
 
        // TODO: Clean up together with the trait
 
        // Setting of new connector's ID
 
        let key = ConnectorKey{ index: index as u32 };
 
        let connector = self.get_mut(&key);
 
        if let ConnectorVariant::UserDefined(connector) = &mut connector.connector {
 
            connector.set_connector_id(key.downcast());
 
        let new_connector = self.get_mut(&key);
 
        new_connector.context.id = key.downcast();
 

	
 
        // Transferring ownership of ports (and crashing if there is a
 
        // programmer's mistake in port management)
 
        match &new_connector.connector {
 
            ConnectorVariant::UserDefined(connector) => {
 
                for port_id in &connector.ports.owned_ports {
 
                    let mut port = created_by.context.remove_port(*port_id);
 
                    port.owning_connector = new_connector.context.id;
 
                    new_connector.context.add_port(port);
 
                }
 
            },
 
            ConnectorVariant::Native(_) => {}, // no initial ports (yet!)
 
        }
 

	
 
        return key;
 
@@ -204,127 +217,6 @@ impl Drop for ConnectorStore {
 
    }
 
}
 

	
 
/// The registry of all ports
 
pub struct PortStore {
 
    inner: RwLock<PortStoreInner>,
 
}
 

	
 
struct PortStoreInner {
 
    ports: RawVec<Port>,
 
    free: Vec<usize>,
 
}
 

	
 
impl PortStore {
 
    fn with_capacity(capacity: usize) -> Self {
 
        Self{
 
            inner: RwLock::new(PortStoreInner{
 
                ports: RawVec::with_capacity(capacity),
 
                free: Vec::with_capacity(capacity),
 
            }),
 
        }
 
    }
 

	
 
    pub(crate) fn get(&self, key: &ConnectorKey, port_id: PortIdLocal) -> PortRef {
 
        let lock = self.inner.read().unwrap();
 
        debug_assert!(port_id.is_valid());
 

	
 
        unsafe {
 
            let port = lock.ports.get_mut(port_id.index as usize);
 
            let port = &mut *port;
 
            debug_assert_eq!(port.owning_connector_id, key.index); // race condition (if they are not equal, which should never happen), better than nothing
 

	
 
            return PortRef{ lock, port };
 
        }
 
    }
 

	
 
    pub(crate) fn create_channel(&self, creating_connector: ConnectorId) -> Channel {
 
        let mut lock = self.inner.write().unwrap();
 

	
 
        // Reserves a new port. Doesn't point it to its counterpart
 
        fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: ConnectorId) -> u32 {
 
            let index;
 

	
 
            if lock.free.is_empty() {
 
                index = lock.ports.len() as u32;
 
                lock.ports.push(Port{
 
                    self_id: PortIdLocal::new(index),
 
                    peer_id: PortIdLocal::new_invalid(),
 
                    kind,
 
                    ownership: PortOwnership::Owned,
 
                    owning_connector: connector_id,
 
                    peer_connector: connector_id
 
                });
 
            } else {
 
                index = lock.free.pop().unwrap() as u32;
 
                let port = unsafe{ &mut *lock.ports.get_mut(index as usize) };
 

	
 
                port.peer_id = PortIdLocal::new_invalid();
 
                port.kind = kind;
 
                port.ownership = PortOwnership::Owned;
 
                port.owning_connector = connector_id;
 
                port.peer_connector = connector_id;
 
            }
 

	
 
            return index;
 
        }
 

	
 
        // Create the ports
 
        let putter_id = reserve_port(&mut lock, PortKind::Putter, creating_connector);
 
        let getter_id = reserve_port(&mut lock, PortKind::Getter, creating_connector);
 
        debug_assert_ne!(putter_id, getter_id);
 

	
 
        // Point them to one another
 
        unsafe {
 
            let putter_port = &mut *lock.ports.get_mut(putter_id as usize);
 
            let getter_port = &mut *lock.ports.get_mut(getter_id as usize);
 
            putter_port.peer_id = getter_port.self_id;
 
            getter_port.peer_id = putter_port.self_id;
 
        }
 

	
 
        return Channel{
 
            putter_id: PortIdLocal::new(putter_id),
 
            getter_id: PortIdLocal::new(getter_id),
 
        }
 
    }
 
}
 

	
 
pub struct PortRef<'p> {
 
    lock: RwLockReadGuard<'p, PortStoreInner>,
 
    port: &'static mut Port,
 
}
 

	
 
impl<'p> std::ops::Deref for PortRef<'p> {
 
    type Target = Port;
 

	
 
    fn deref(&self) -> &Self::Target {
 
        return self.port;
 
    }
 
}
 

	
 
impl<'p> std::ops::DerefMut for PortRef<'p> {
 
    fn deref_mut(&mut self) -> &mut Self::Target {
 
        return self.port;
 
    }
 
}
 

	
 
impl Drop for PortStore {
 
    fn drop(&mut self) {
 
        let lock = self.inner.write().unwrap();
 

	
 
        // Very lazy code
 
        for idx in 0..lock.ports.len() {
 
            if lock.free.contains(&idx) {
 
                continue;
 
            }
 

	
 
            unsafe {
 
                let port = lock.ports.get_mut(idx);
 
                std::ptr::drop_in_place(port);
 
            }
 
        }
 
    }
 
}
 

	
 
/// Global store of connectors, ports and queues that are used by the sceduler
 
/// threads. The global store has the appearance of a thread-safe datatype, but
 
/// one needs to be careful using it.
 
@@ -335,7 +227,6 @@ impl Drop for PortStore {
 
pub struct GlobalStore {
 
    pub connector_queue: MpmcQueue<ConnectorKey>,
 
    pub connectors: ConnectorStore,
 
    pub ports: PortStore,
 
    pub should_exit: AtomicBool,    // signal threads to exit
 
}
 

	
 
@@ -344,7 +235,6 @@ impl GlobalStore {
 
        Self{
 
            connector_queue: MpmcQueue::with_capacity(256),
 
            connectors: ConnectorStore::with_capacity(256),
 
            ports: PortStore::with_capacity(256),
 
            should_exit: AtomicBool::new(false),
 
        }
 
    }
src/runtime2/inbox.rs
Show inline comments
 
@@ -20,29 +20,11 @@ use crate::protocol::eval::ValueGroup;
 
use super::connector::{BranchId, PortIdLocal};
 
use super::global_store::ConnectorId;
 

	
 
/// A message prepared by a connector. Waiting to be picked up by the runtime to
 
/// be sent to another connector.
 
#[derive(Clone)]
 
pub struct OutgoingDataMessage {
 
    pub sending_port: PortIdLocal,
 
    pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
 
    pub sender_cur_branch_id: BranchId, // always valid
 
    pub message: ValueGroup,
 
}
 

	
 
pub enum OutgoingMessage {
 
    Data(OutgoingDataMessage),
 
    Sync(SyncMessage),
 
    Solution(SolutionMessage),
 
}
 

	
 
/// A message that has been delivered (after being imbued with the receiving
 
/// port by the scheduler) to a connector.
 
#[derive(Clone)]
 
pub struct DataMessage {
 
    pub sending_connector: ConnectorId,
 
    pub sending_port: PortIdLocal,
 
    pub receiving_port: PortIdLocal,
 
    pub sender_prev_branch_id: BranchId,
 
    pub sender_cur_branch_id: BranchId,
 
    pub message: ValueGroup,
 
@@ -186,14 +168,16 @@ impl SyncMessage {
 
}
 

	
 
pub struct SolutionMessage {
 
    pub comparison_number: u64,
 
    pub connector_origin: ConnectorId,
 
    pub local_solutions: Vec<(ConnectorId, BranchId)>,
 
    pub to_visit: Vec<ConnectorId>,
 
}
 

	
 
/// A control message. These might be sent by the scheduler to notify eachother
 
/// of asynchronous state changes.
 
pub struct ControlMessage {
 
    pub id: u32, // generic identifier, used to match request to response
 
    pub sender: ConnectorId,
 
    pub content: ControlMessageVariant,
 
}
 

	
 
@@ -202,15 +186,21 @@ pub enum ControlMessageVariant {
 
    Ack, // acknowledgement of previous control message, matching occurs through control message ID.
 
}
 

	
 
/// Generic message in the `PublicInbox`, handled by the scheduler (which takes
 
/// out and handles all control message and potential routing). The correctly
 
/// addressed `Data` variants will end up at the connector.
 
pub enum Message {
 
    Data(DataMessage),          // data message, handled by connector
 
    Sync(SyncMessage),          // sync message, handled by both connector/scheduler
 
    Solution(SolutionMessage),  // solution message, finishing a sync round
 
    Control(ControlMessage),    // control message, handled by scheduler
 
    Ping,                       // ping message, intentionally waking up a connector (used for native connectors)
 
/// Generic message contents.
 
#[derive(Clone)]
 
pub enum MessageContents {
 
    Data(DataMessage),              // data message, handled by connector
 
    Sync(SyncMessage),              // sync message, handled by both connector/scheduler
 
    RequestCommit(SolutionMessage), // solution message, requesting participants to commit
 
    ConfirmCommit(SolutionMessage), // solution message, confirming a solution everyone committed to
 
    Control(ControlMessage),        // control message, handled by scheduler
 
    Ping,                           // ping message, intentionally waking up a connector (used for native connectors)
 
}
 

	
 
pub struct Message {
 
    pub sending_connector: ConnectorId,
 
    pub receiving_port: PortIdLocal, // may be invalid (in case of messages targeted at the connector)
 
    pub contents: MessageContents,
 
}
 

	
 
/// The public inbox of a connector. The thread running the connector that owns
src/runtime2/native.rs
Show inline comments
 
@@ -7,6 +7,9 @@ use crate::protocol::eval::ValueGroup;
 
use crate::ProtocolDescription;
 
use crate::runtime2::connector::{Branch, find_ports_in_value_group};
 
use crate::runtime2::global_store::{ConnectorKey, GlobalStore};
 
use crate::runtime2::inbox::MessageContents;
 
use crate::runtime2::port::{Port, PortKind};
 
use crate::runtime2::scheduler::ConnectorCtx;
 

	
 
use super::RuntimeInner;
 
use super::global_store::{ConnectorVariant, ConnectorId};
 
@@ -14,14 +17,19 @@ use super::port::{Channel, PortIdLocal};
 
use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState};
 
use super::inbox::{Message, DataMessage, SyncMessage};
 

	
 
/// Generic connector interface from the scheduler's point of view.
 
pub trait Connector {
 
    fn insert_data_message(&mut self, message: DataMessage);
 
    fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState);
 
    fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
    /// Handle a new message (preprocessed by the scheduler). You probably only
 
    /// want to handle `Data`, `Sync`, and `Solution` messages. The others are
 
    /// intended for the scheduler itself.
 
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState);
 

	
 
    /// Should run the connector's behaviour up until the next blocking point.
 
    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
}
 

	
 
type SyncDone = Arc<(Mutex<bool>, Condvar)>;
 
type JobQueue = Arc<Mutex<Vec<ApplicationJob>>>,
 
type JobQueue = Arc<Mutex<Vec<ApplicationJob>>>;
 

	
 
enum ApplicationJob {
 
    NewConnector(ConnectorPDL),
 
@@ -47,15 +55,11 @@ impl ConnectorApplication {
 
}
 

	
 
impl Connector for ConnectorApplication {
 
    fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState) {
 
        todo!("handling sync messages in ApplicationConnector");
 
    }
 

	
 
    fn insert_data_message(&mut self, message: DataMessage)  {
 
        todo!("handling messages in ApplicationConnector");
 
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        todo!("handling messages in ConnectorApplication (API for runtime)")
 
    }
 

	
 
    fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop() {
 
            match job {
 
@@ -90,9 +94,18 @@ impl ApplicationInterface {
 

	
 
    /// Creates a new channel.
 
    pub fn create_channel(&mut self) -> Channel {
 
        let channel = self.runtime.global_store.ports.create_channel(self.connector_id);
 
        self.owned_ports.push(channel.putter_id);
 
        self.owned_ports.push(channel.getter_id);
 
        // TODO: Duplicated logic in scheduler
 
        let getter_id = self.runtime.global_store.connectors.port_counter.fetch_add(2, Ordering::SeqCst);
 
        let putter_id = PortIdLocal::new(getter_id + 1);
 
        let getter_id = PortIdLocal::new(getter_id);
 

	
 
        self.ports.push(Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            kind: PortKind::Getter,
 
            owning_connector: self.connector_id,
 
            peer_connector: self.connector_id,
 
        });
 

	
 
        return channel;
 
    }
src/runtime2/port.rs
Show inline comments
 
use super::global_store::ConnectorId;
 

	
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct PortIdLocal {
 
    pub index: u32,
 
}
 
@@ -26,22 +26,13 @@ pub enum PortKind {
 
    Getter,
 
}
 

	
 
pub enum PortOwnership {
 
    Unowned, // i.e. held by a native application
 
    Owned,
 
    InTransit,
 
}
 

	
 
/// Represents a port inside of the runtime. May be without owner if it is
 
/// created by the application interfacing with the runtime, instead of being
 
/// created by a connector.
 
pub struct Port {
 
    // Once created, these values are immutable
 
    pub self_id: PortIdLocal,
 
    pub peer_id: PortIdLocal,
 
    pub kind: PortKind,
 
    // But this can be changed, but only by the connector that owns it
 
    pub ownership: PortOwnership,
 
    pub owning_connector: ConnectorId,
 
    pub peer_connector: ConnectorId, // might be temporarily inconsistent while peer port is sent around in non-sync phase.
 
}
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Arc;
 
use std::sync::Condvar;
 
use std::sync::atomic::Ordering;
 
use std::sync::atomic::{AtomicU32, Ordering};
 
use std::time::Duration;
 
use std::thread;
 

	
 
use crate::ProtocolDescription;
 
use crate::runtime2::global_store::ConnectorVariant;
 
use crate::runtime2::inbox::MessageContents;
 
use crate::runtime2::native::Connector;
 
use crate::runtime2::port::{Channel, PortKind, PortOwnership};
 

	
 
use super::RuntimeInner;
 
use super::port::{PortIdLocal};
 
use super::port::{Port, PortIdLocal};
 
use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant};
 
use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState};
 
use super::global_store::{ConnectorKey, ConnectorId, GlobalStore};
 

	
 
/// Contains fields that are mostly managed by the scheduler, but may be
 
/// accessed by the connector
 
pub(crate) struct ConnectorCtx {
 
    pub(crate) id: ConnectorId,
 
    port_counter: Arc<AtomicU32>,
 
    pub(crate) ports: Vec<Port>,
 
}
 

	
 
impl ConnectorCtx {
 
    pub(crate) fn new(port_counter: Arc<AtomicU32>) -> ConnectorCtx {
 
        Self{
 
            id: ConnectorId::new_invalid(),
 
            port_counter,
 
            ports: Vec::new(),
 
        }
 
    }
 

	
 
    /// Creates a (putter, getter) port pair belonging to the same channel. The
 
    /// port will be implicitly owned by the connector.
 
    pub(crate) fn create_channel(&mut self) -> Channel {
 
        let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst);
 
        let putter_id = PortIdLocal::new(getter_id + 1);
 
        let getter_id = PortIdLocal::new(getter_id);
 

	
 
        self.ports.push(Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            kind: PortKind::Getter,
 
            owning_connector: self.id,
 
            peer_connector: self.id,
 
        });
 

	
 
        self.ports.push(Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            kind: PortKind::Putter,
 
            owning_connector: self.id,
 
            peer_connector: self.id,
 
        });
 

	
 
        return Channel{ getter_id, putter_id };
 
    }
 

	
 
    pub(crate) fn add_port(&mut self, port: Port) {
 
        debug_assert!(!self.ports.iter().any(|v| v.self_id == port.self_id));
 
        self.ports.push(port);
 
    }
 

	
 
    pub(crate) fn remove_port(&mut self, id: PortIdLocal) -> Port {
 
        let index = self.port_id_to_index(id);
 
        return self.ports.remove(index);
 
    }
 

	
 
    pub(crate) fn get_port(&self, id: PortIdLocal) -> &Port {
 
        let index = self.port_id_to_index(id);
 
        return &self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_mut(&mut self, id: PortIdLocal) -> &mut Port {
 
        let index = self.port_id_to_index(id);
 
        return &mut self.ports[index];
 
    }
 

	
 
    fn port_id_to_index(&self, id: PortIdLocal) -> usize {
 
        for (idx, port) in self.ports.iter().enumerate() {
 
            if port.self_id == id {
 
                return idx;
 
            }
 
        }
 

	
 
        panic!("port {:?}, not owned by connector", id);
 
    }
 
}
 

	
 
pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
}
 
@@ -57,76 +133,64 @@ impl Scheduler {
 
            while cur_schedule == ConnectorScheduling::Immediate {
 
                // Check all the message that are in the shared inbox
 
                while let Some(message) = scheduled.public.inbox.take_message() {
 
                    // TODO: Put header in front of messages, this is a mess
 
                    match message {
 
                        Message::Data(message) => {
 
                    match message.contents {
 
                        MessageContents::Data(content) => {
 
                            // Check if we need to reroute, or can just put it
 
                            // in the private inbox of the connector
 
                            if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.sending_port) {
 
                                self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(message));
 
                            if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, content.sending_port) {
 
                                self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(content));
 
                            } else {
 
                                scheduled.connector.insert_data_message(message);
 
                                scheduled.connector.insert_data_message(content);
 
                            }
 
                        }
 
                        MessageContents::Sync(content) => {
 
                            scheduled.connector.insert_sync_message(content, &scheduled.context, &mut delta_state);
 
                        }
 
                        MessageContents::Solution(content) => {
 
                            // TODO: Handle solution message
 
                        },
 
                        Message::Sync(message) => {
 
                            // TODO: Come back here after rewriting port ownership stuff
 
                            if let Some(other_connector_id) = scheduled.router.should_reroute()
 
                        },
 
                        Message::Solution(solution) => {
 

	
 
                        },
 
                        Message::Control(message) => {
 
                            match message.content {
 
                        MessageContents::Control(content) => {
 
                            match content.content {
 
                                ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
 
                                    // Need to change port target
 
                                    let port = self.runtime.global_store.ports.get(&connector_key, port_id);
 
                                    let port = scheduled.context.get_port_mut(port_id);
 
                                    port.peer_connector = new_target_connector_id;
 
                                    debug_assert!(delta_state.outbox.is_empty());
 

	
 
                                    // And respond with an Ack
 
                                    // Note: after this code has been reached, we may not have any
 
                                    // messages in the outbox that send to the port whose owning
 
                                    // connector we just changed. This is because the `ack` will
 
                                    // clear the rerouting entry of the `ack`-receiver.
 
                                    self.send_message_and_wake_up_if_sleeping(
 
                                        message.sender,
 
                                        Message::Control(ControlMessage{
 
                                            id: message.id,
 
                                            sender: connector_key.downcast(),
 
                                            content: ControlMessageVariant::Ack,
 
                                        })
 
                                        content.sender,
 
                                        Message{
 
                                            sending_connector: connector_key.downcast(),
 
                                            receiving_port: PortIdLocal::new_invalid(),
 
                                            contents: MessageContents::Control(ControlMessage{
 
                                                id: content.id,
 
                                                content: ControlMessageVariant::Ack,
 
                                            }),
 
                                        }
 
                                    );
 
                                },
 
                                ControlMessageVariant::Ack => {
 
                                    scheduled.router.handle_ack(message.id);
 
                                    scheduled.router.handle_ack(content.id);
 
                                }
 
                            }
 
                        },
 
                        }
 
                        Message::Ping => {},
 
                    }
 
                }
 

	
 
                // Actually run the connector
 
                // TODO: Revise
 
                let new_schedule;
 
                match &mut scheduled.connector {
 
                    ConnectorVariant::UserDefined(connector) => {
 
                        if connector.is_in_sync_mode() {
 
                            // In synchronous mode, so we can expect messages being sent,
 
                            // but we never expect the creation of connectors
 
                            new_schedule = connector.run_in_speculative_mode(&self.runtime.protocol_description, &mut delta_state);
 
                            debug_assert!(delta_state.new_connectors.is_empty());
 
                        } else {
 
                            // In regular running mode (not in a sync block) we cannot send
 
                            // messages but we can create new connectors
 
                            new_schedule = connector.run_in_deterministic_mode(&self.runtime.protocol_description, &mut delta_state);
 
                            debug_assert!(delta_state.outbox.is_empty());
 
                        }
 
                    },
 
                    ConnectorVariant::Native(connector) => {
 
                        new_schedule = connector.run(&self.runtime.protocol_description);
 
                    },
 
                }
 
                let new_schedule = scheduled.connector.run(
 
                    &self.runtime.protocol_description, &scheduled.context, &mut delta_state
 
                );
 

	
 
                // Handle all of the output from the current run: messages to
 
                // send and connectors to instantiate.
 
                self.handle_delta_state(&connector_key, &mut delta_state);
 
                self.handle_delta_state(&connector_key, &mut scheduled.context, &mut delta_state);
 

	
 
                cur_schedule = new_schedule;
 
            }
 
@@ -164,26 +228,54 @@ impl Scheduler {
 
        }
 
    }
 

	
 
    fn handle_delta_state(&mut self, connector_key: &ConnectorKey, delta_state: &mut RunDeltaState) {
 
    fn handle_delta_state(&mut self, connector_key: &ConnectorKey, context: &mut ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        // Handling any messages that were sent
 
        let connector_id = connector_key.downcast();
 

	
 
        if !delta_state.outbox.is_empty() {
 
            for message in delta_state.outbox.drain(..) {
 
                let (inbox_message, target_connector_id) = {
 
                    let sending_port = self.runtime.global_store.ports.get(&connector_key, message.sending_port);
 
                    (
 
                        DataMessage {
 
                            sending_connector: connector_key.downcast(),
 
                            sending_port: sending_port.self_id,
 
                            receiving_port: sending_port.peer_id,
 
                            sender_prev_branch_id: message.sender_prev_branch_id,
 
                            sender_cur_branch_id: message.sender_cur_branch_id,
 
                            message: message.message,
 
                        },
 
                        sending_port.peer_connector,
 
                    )
 
            for mut message in delta_state.outbox.drain(..) {
 
                // Based on the message contents, decide where the message
 
                // should be sent to. This might end up modifying the message.
 
                let (peer_connector, peer_port) = match &mut message {
 
                    MessageContents::Data(contents) => {
 
                        let port = context.get_port(contents.sending_port);
 
                        (port.peer_connector, port.peer_id)
 
                    },
 
                    MessageContents::Sync(contents) => {
 
                        let connector = contents.to_visit.pop().unwrap();
 
                        (connector, PortIdLocal::new_invalid())
 
                    },
 
                    MessageContents::RequestCommit(contents)=> {
 
                        let connector = contents.to_visit.pop().unwrap();
 
                        (connector, PortIdLocal::new_invalid())
 
                    },
 
                    MessageContents::ConfirmCommit(contents) => {
 
                        for to_visit in &contents.to_visit {
 
                            let message = Message{
 
                                sending_connector: connector_id,
 
                                receiving_port: PortIdLocal::new_invalid(),
 
                                contents: contents.clone(),
 
                            };
 
                            self.send_message_and_wake_up_if_sleeping(*to_visit, message);
 
                        }
 
                        (ConnectorId::new_invalid(), PortIdLocal::new_invalid())
 
                    },
 
                    MessageContents::Control(_) | MessageContents::Ping => {
 
                        // Never generated by the user's code
 
                        unreachable!();
 
                    }
 
                };
 

	
 
                self.send_message_and_wake_up_if_sleeping(target_connector_id, Message::Data(inbox_message));
 
                // TODO: Maybe clean this up, perhaps special case for
 
                //  ConfirmCommit can be handled differently.
 
                if peer_connector.is_valid() {
 
                    let message = Message {
 
                        sending_connector: connector_id,
 
                        receiving_port: peer_port,
 
                        contents: message,
 
                    };
 
                    self.send_message_and_wake_up_if_sleeping(peer_connector, message);
 
                }
 
            }
 
        }
 

	
 
@@ -194,25 +286,16 @@ impl Scheduler {
 

	
 
            for new_connector in delta_state.new_connectors.drain(..) {
 
                // Add to global registry to obtain key
 
                let new_key = self.runtime.global_store.connectors.create(ConnectorVariant::UserDefined(new_connector));
 
                let new_key = self.runtime.global_store.connectors.create(cur_connector, ConnectorVariant::UserDefined(new_connector));
 
                let new_connector = self.runtime.global_store.connectors.get_mut(&new_key);
 

	
 
                // Each port should be lost by the connector that created the
 
                // new one. Note that the creator is the current owner.
 
                for port_id in &new_connector.ports.owned_ports {
 
                    debug_assert!(!cur_connector.ports.owned_ports.contains(port_id));
 

	
 
                    // Modify ownership, retrieve peer connector
 
                    let (peer_connector_id, peer_port_id) = {
 
                        let mut port = self.runtime.global_store.ports.get(connector_key, *port_id);
 
                        port.owning_connector = new_key.downcast();
 

	
 
                        (port.peer_connector, port.peer_id)
 
                    };
 

	
 
                    // Send message that port has changed ownership
 
                // Call above changed ownership of ports, but we still have to
 
                // let the other end of the channel know that the port has
 
                // changed location.
 
                for port in &new_connector.context.ports {
 
                    let reroute_message = cur_connector.router.prepare_reroute(
 
                        port_id, peer_port_id, connector_key.downcast(), peer_connector_id, new_key.downcast()
 
                        port.self_id, port.peer_id, cur_connector.context.id,
 
                        port.peer_connector, new_connector.context.id
 
                    );
 

	
 
                    self.send_message_and_wake_up_if_sleeping(peer_connector_id, reroute_message);
 
@@ -282,7 +365,6 @@ impl Router {
 

	
 
        return Message::Control(ControlMessage{
 
            id,
 
            sender: self_connector_id,
 
            content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id)
 
        });
 
    }
0 comments (0 inline, 0 general)