Changeset - 15b9bb47abdc
[Not reviewed]
0 4 0
mh - 4 years ago 2021-10-28 11:27:30
contact@maxhenger.nl
WIP on rewriting execution ctx to fix recv bug
4 files changed with 169 insertions and 13 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::sync::atomic::AtomicBool;
 

	
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::scheduler::Scheduler;
 
use crate::runtime2::scheduler::{ComponentCtxFancy, Scheduler};
 

	
 
use super::ConnectorId;
 
use super::native::Connector;
 
use super::scheduler::{SchedulerCtx, ConnectorCtx};
 
use super::inbox::{
 
    PrivateInbox, PublicInbox,
 
    DataMessage, SyncMessage, SolutionMessage, Message, MessageContents,
 
    SyncBranchConstraint, SyncConnectorSolution
 
};
 
use super::port::{Port, PortKind, PortIdLocal};
 

	
 
/// Represents the identifier of a branch (the index within its container). An
 
/// ID of `0` generally means "no branch" (e.g. no parent, or a port did not
 
/// yet receive anything from any branch).
 
// TODO: Remove Debug derive
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub struct BranchId {
 
    pub index: u32,
 
}
 

	
 
impl BranchId {
 
    fn new_invalid() -> Self {
 
        Self{ index: 0 }
 
    }
 

	
 
    fn new(index: u32) -> Self {
 
        debug_assert!(index != 0);
 
        Self{ index }
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.index != 0;
 
    }
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
pub(crate) enum SpeculativeState {
 
    // Non-synchronous variants
 
    RunningNonSync,         // regular execution of code
 
    Error,                  // encountered a runtime error
 
    Finished,               // finished executing connector's code
 
    // Synchronous variants
 
    RunningInSync,          // running within a sync block
 
    HaltedAtBranchPoint,    // at a branching point (at a `get` call)
 
    ReachedSyncEnd,         // reached end of sync block, branch represents a local solution
 
    Inconsistent,           // branch can never represent a local solution, so halted
 
}
 
@@ -365,214 +365,214 @@ impl<'a> RunContext for ConnectorRunContext<'a> {
 

	
 
        let port_index = self.ports.get_port_index(PortIdLocal::new(port.0.u32_suffix)).unwrap();
 
        let mapping = self.ports.get_port(self.branch_index, port_index);
 
        return mapping.is_assigned;
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        match self.received.get(&port_id) {
 
            Some(message) => Some(message.message.clone()),
 
            None => None,
 
        }
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        if self.ports_delta.iter().any(|v| v.port_id == port_id) {
 
            return None
 
        }
 

	
 
        let port_index = self.ports.get_port_index(port_id).unwrap();
 
        let mapping = self.ports.get_port(self.branch_index, port_index);
 

	
 
        if mapping.is_assigned {
 
            return Some(Value::Bool(mapping.num_times_fired != 0));
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        return self.prepared_channel.take();
 
    }
 
}
 

	
 
impl Connector for ConnectorPDL {
 
    fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        use MessageContents as MC;
 

	
 
        match message.contents {
 
            MC::Data(content) => self.handle_data_message(message.receiving_port, content),
 
            MC::Sync(content) => self.handle_sync_message(content, ctx, delta_state),
 
            MC::RequestCommit(content) => self.handle_request_commit_message(content, ctx, delta_state),
 
            MC::ConfirmCommit(content) => self.handle_confirm_commit_message(content, ctx, delta_state),
 
            MC::Control(_) | MC::Ping => {},
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        if self.in_sync {
 
            // Check for new messages we haven't seen before. If any of the
 
            // pending branches can accept the message, do so.
 
            while let Some((target_port_id, message)) = self.inbox.next_message() {
 
            while let Some((target_port_id, message)) = comp_ctx.read_next_message() {
 
                let mut branch_idx = self.sync_pending_get.first;
 
                while branch_idx != 0 {
 
                    let branch = &self.branches[branch_idx as usize];
 
                    let next_branch_idx = branch.next_branch_in_queue.unwrap_or(0);
 

	
 
                    let target_port_index = self.ports.get_port_index(*target_port_id).unwrap();
 
                    let port_mapping = self.ports.get_port(branch_idx, target_port_index);
 

	
 
                    if branch.sync_state == SpeculativeState::HaltedAtBranchPoint &&
 
                        branch.halted_at_port == *target_port_id &&
 
                        port_mapping.last_registered_branch_id == message.sender_prev_branch_id {
 
                        // Branch may accept this mesage, so create a fork that
 
                        // contains this message in the inbox.
 
                        let new_branch_idx = self.branches.len() as u32;
 
                        let new_branch = Branch::new_sync_branching_from(new_branch_idx, branch);
 

	
 
                        self.ports.prepare_sync_branch(branch_idx, new_branch_idx);
 
                        let mapping = self.ports.get_port_mut(branch_idx, target_port_index);
 
                        mapping.last_registered_branch_id = message.sender_cur_branch_id;
 

	
 
                        let new_branch_id = BranchId::new(new_branch_idx);
 
                        self.branches.push(new_branch);
 
                        Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id)
 
                    }
 

	
 
                    branch_idx = next_branch_idx;
 
                }
 
            }
 

	
 
            let scheduling = self.run_in_speculative_mode(sched_ctx, conn_ctx, delta_state);
 
            let scheduling = self.run_in_speculative_mode(sched_ctx, comp_ctx, conn_ctx, delta_state);
 

	
 
            // When in speculative mode we might have generated new sync
 
            // solutions, we need to turn them into proposed solutions here.
 
            if self.sync_finished_last_handled != self.sync_finished.last {
 
                // Retrieve first element in queue
 
                let mut next_id;
 
                if self.sync_finished_last_handled == 0 {
 
                    next_id = self.sync_finished.first;
 
                } else {
 
                    let last_handled = &self.branches[self.sync_finished_last_handled as usize];
 
                    debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue"
 
                    next_id = last_handled.next_branch_in_queue.unwrap();
 
                }
 

	
 
                loop {
 
                    let branch_id = BranchId::new(next_id);
 
                    let branch = &self.branches[next_id as usize];
 
                    let branch_next = branch.next_branch_in_queue;
 

	
 
                    // Turn local solution into a message and send it along
 
                    // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed
 
                    let solution_message = self.generate_initial_solution_for_branch(branch_id, conn_ctx);
 
                    if let Some(valid_solution) = solution_message {
 
                        self.submit_sync_solution(valid_solution, conn_ctx, delta_state);
 
                    } else {
 
                        // Branch is actually invalid, but we only just figured
 
                        // it out. We need to mark it as invalid to prevent
 
                        // future use
 
                        Self::remove_branch_from_queue(&mut self.branches, &mut self.sync_finished, branch_id);
 
                        if branch_id.index == self.sync_finished_last_handled {
 
                            self.sync_finished_last_handled = self.sync_finished.last;
 
                        }
 

	
 
                        let branch = &mut self.branches[next_id as usize];
 
                        branch.sync_state = SpeculativeState::Inconsistent;
 
                    }
 

	
 
                    match branch_next {
 
                        Some(id) => next_id = id,
 
                        None => break,
 
                    }
 
                }
 

	
 
                self.sync_finished_last_handled = next_id;
 
            }
 

	
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(sched_ctx, conn_ctx, delta_state);
 
            let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx, conn_ctx, delta_state);
 
            return scheduling;
 
        }
 
    }
 
}
 

	
 
impl ConnectorPDL {
 
    /// Constructs a representation of a connector. The assumption is that the
 
    /// initial branch is at the first instruction of the connector's code,
 
    /// hence is in a non-sync state.
 
    pub fn new(initial_branch: Branch, owned_ports: Vec<PortIdLocal>) -> Self {
 
        Self{
 
            in_sync: false,
 
            branches: vec![initial_branch],
 
            sync_active: BranchQueue::new(),
 
            sync_pending_get: BranchQueue::new(),
 
            sync_finished: BranchQueue::new(),
 
            sync_finished_last_handled: 0, // none at all
 
            cur_round: 0,
 
            committed_to: None,
 
            inbox: PrivateInbox::new(),
 
            ports: ConnectorPorts::new(owned_ports),
 
        }
 
    }
 

	
 
    pub fn is_in_sync_mode(&self) -> bool {
 
        return self.in_sync;
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling connector messages
 
    // -------------------------------------------------------------------------
 

	
 
    pub fn handle_data_message(&mut self, target_port: PortIdLocal, message: DataMessage) {
 
        self.inbox.insert_message(target_port, message);
 
        // self.inbox.insert_message(target_port, message);
 
    }
 

	
 
    /// Accepts a synchronous message and combines it with the locally stored
 
    /// solution(s). Then queue new `Sync`/`Solution` messages when appropriate.
 
    pub fn handle_sync_message(&mut self, message: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) {
 
        debug_assert!(!message.to_visit.contains(&ctx.id)); // own ID already removed
 
        debug_assert!(message.constraints.iter().any(|v| v.connector_id == ctx.id)); // we have constraints
 

	
 
        // TODO: Optimize, use some kind of temp workspace vector
 
        let mut execution_path_branch_ids = Vec::new();
 

	
 
        if self.sync_finished_last_handled != 0 {
 
            // We have some solutions to match against
 
            let constraints_index = message.constraints
 
                .iter()
 
                .position(|v| v.connector_id == ctx.id)
 
                .unwrap();
 
            let constraints = &message.constraints[constraints_index].constraints;
 
            debug_assert!(!constraints.is_empty());
 

	
 
            // Note that we only iterate over the solutions we've already
 
            // handled ourselves, not necessarily
 
            let mut branch_index = self.sync_finished.first;
 
            'branch_loop: loop {
 
                // Load solution branch
 
                let branch = &self.branches[branch_index as usize];
 
                execution_path_branch_ids.clear();
 
                self.branch_ids_of_execution_path(BranchId::new(branch_index), &mut execution_path_branch_ids);
 

	
 
                // Check if the branch matches all of the applied constraints
 
                for constraint in constraints {
 
                    match constraint {
 
                        SyncBranchConstraint::SilentPort(silent_port_id) => {
 
                            let port_index = self.ports.get_port_index(*silent_port_id);
 
                            if port_index.is_none() {
 
                                // Nefarious peer
 
                                continue 'branch_loop;
 
                            }
 
                            let port_index = port_index.unwrap();
 

	
 
                            let mapping = self.ports.get_port(branch_index, port_index);
 
                            debug_assert!(mapping.is_assigned);
 

	
 
                            if mapping.num_times_fired != 0 {
 
                                // Not silent, constraint not satisfied
 
                                continue 'branch_loop;
 
                            }
 
                        },
 
@@ -719,97 +719,97 @@ impl ConnectorPDL {
 
        assert_eq!(message.connector_origin, expected_connector_id);
 
        assert_eq!(message.comparison_number, expected_comparison_number);
 

	
 
        // Find the branch we're supposed to commit to
 
        let (_, branch_id) = message.local_solutions
 
            .iter()
 
            .find(|(id, _)| *id == ctx.id)
 
            .unwrap();
 
        let branch_id = *branch_id;
 

	
 
        // Commit to the branch. That is: move the solution branch to the first
 
        // of the connector's branches
 
        self.in_sync = false;
 
        self.branches.swap(0, branch_id.index as usize);
 
        self.branches.truncate(1); // TODO: Or drain and do not deallocate?
 
        let solution = &mut self.branches[0];
 

	
 
        // Clear all of the other sync-related variables
 
        self.sync_active.clear();
 
        self.sync_pending_get.clear();
 
        self.sync_finished.clear();
 
        self.sync_finished_last_handled = 0;
 
        self.cur_round += 1;
 

	
 
        self.committed_to = None;
 
        self.inbox.clear();
 
        self.ports.commit_to_sync();
 

	
 
        // Add/remove any of the ports we lost during the sync phase
 
        for port_delta in &solution.ports_delta {
 
            if port_delta.acquired {
 
                self.ports.add_port(port_delta.port_id);
 
            } else {
 
                self.ports.remove_port(port_delta.port_id);
 
            }
 
        }
 
        solution.commit_to_sync();
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Executing connector code
 
    // -------------------------------------------------------------------------
 

	
 
    /// Runs the connector in synchronous mode. Potential changes to the global
 
    /// system's state are added to the `RunDeltaState` object by the connector,
 
    /// where it is the caller's responsibility to immediately take care of
 
    /// those changes. The return value indicates when (and if) the connector
 
    /// needs to be scheduled again.
 
    pub fn run_in_speculative_mode(&mut self, sched_ctx: SchedulerCtx, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_speculative_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(self.in_sync);
 

	
 
        if self.sync_active.is_empty() {
 
            return ConnectorScheduling::NotNow;
 
        }
 

	
 
        let branch = Self::pop_branch_from_queue(&mut self.branches, &mut self.sync_active);
 

	
 
        // Run the branch to the next blocking point
 
        debug_assert!(branch.prepared_channel.is_none());
 
        let mut run_context = ConnectorRunContext {
 
            branch_index: branch.index.index,
 
            ports: &self.ports,
 
            ports_delta: &branch.ports_delta,
 
            scheduler: sched_ctx,
 
            prepared_channel: None,
 
            received: &branch.received,
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
        // Match statement contains `return` statements only if the particular
 
        // run result behind handled requires an immediate re-run of the
 
        // connector.
 
        match run_result {
 
            RunResult::BranchInconsistent => {
 
                // Speculative branch became inconsistent
 
                branch.sync_state = SpeculativeState::Inconsistent;
 
            },
 
            RunResult::BranchMissingPortState(port_id) => {
 
                // Branch called `fires()` on a port that does not yet have an
 
                // assigned speculative value. So we need to create those
 
                // branches
 
                let local_port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let local_port_index = self.ports.get_port_index(local_port_id).unwrap();
 

	
 
                debug_assert!(self.ports.owned_ports.contains(&local_port_id));
 

	
 
                // Create two copied branches, one silent and one firing
 
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                let parent_branch_id = branch.index;
 
                let parent_branch = &self.branches[parent_branch_id.index as usize];
 

	
 
                let silent_index = self.branches.len() as u32;
 
                let firing_index = silent_index + 1;
 

	
 
                let silent_branch = Branch::new_sync_branching_from(silent_index, parent_branch);
 
                self.ports.prepare_sync_branch(parent_branch.index.index, silent_index);
 

	
 
@@ -819,97 +819,97 @@ impl ConnectorPDL {
 
                // Assign the port values of the two new branches
 
                let silent_port = self.ports.get_port_mut(silent_index, local_port_index);
 
                silent_port.mark_speculative(0);
 

	
 
                let firing_port = self.ports.get_port_mut(firing_index, local_port_index);
 
                firing_port.mark_speculative(1);
 

	
 
                // Run both branches again
 
                let silent_branch_id = silent_branch.index;
 
                self.branches.push(silent_branch);
 
                let firing_branch_id = firing_branch.index;
 
                self.branches.push(firing_branch);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, silent_branch_id);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, firing_branch_id);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            RunResult::BranchMissingPortValue(port_id) => {
 
                // Branch performed a `get` on a port that has not yet received
 
                // a value in its inbox.
 
                let local_port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let local_port_index = self.ports.get_port_index(local_port_id);
 
                if local_port_index.is_none() {
 
                    todo!("deal with the case where the port is acquired");
 
                }
 
                let local_port_index = local_port_index.unwrap();
 
                let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index);
 

	
 
                // Check for port mapping assignment and, if present, if it is
 
                // consistent
 
                let is_valid_get = if port_mapping.is_assigned {
 
                    assert!(port_mapping.num_times_fired <= 1); // temporary, until we get rid of `fires`
 
                    port_mapping.num_times_fired == 1
 
                } else {
 
                    // Not yet assigned
 
                    port_mapping.mark_speculative(1);
 
                    true
 
                };
 

	
 
                if is_valid_get {
 
                    // Mark as a branching point for future messages
 
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                    branch.halted_at_port = local_port_id;
 
                    let branch_id = branch.index;
 
                    Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch_id);
 

	
 
                    // But if some messages can be immediately applied, do so
 
                    // now.
 
                    let messages = self.inbox.get_messages(local_port_id, port_mapping.last_registered_branch_id);
 
                    let messages = comp_ctx.get_read_messages(local_port_id, port_mapping.last_registered_branch_id);
 
                    let mut did_have_messages = false;
 

	
 
                    for message in messages {
 
                        did_have_messages = true;
 

	
 
                        // For each message prepare a new branch to execute
 
                        let parent_branch = &self.branches[branch_id.index as usize];
 
                        let new_branch_index = self.branches.len() as u32;
 
                        let mut new_branch = Branch::new_sync_branching_from(new_branch_index, parent_branch);
 
                        self.ports.prepare_sync_branch(branch_id.index, new_branch_index);
 

	
 
                        let port_mapping = self.ports.get_port_mut(new_branch_index, local_port_index);
 
                        port_mapping.last_registered_branch_id = message.sender_cur_branch_id;
 
                        debug_assert!(port_mapping.is_assigned && port_mapping.num_times_fired == 1);
 

	
 
                        new_branch.received.insert(local_port_id, message.clone());
 

	
 
                        // If the message contains any ports then they will now
 
                        // be owned by the new branch
 
                        debug_assert!(results.ports.is_empty());
 
                        find_ports_in_value_group(&message.message, &mut results.ports);
 
                        Self::acquire_ports_during_sync(&mut self.ports, &mut new_branch, &results.ports);
 
                        results.ports.clear();
 

	
 
                        // Schedule the new branch
 
                        debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync);
 
                        let new_branch_id = new_branch.index;
 
                        self.branches.push(new_branch);
 
                        Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id);
 
                    }
 

	
 
                    if did_have_messages {
 
                        // If we did create any new branches, then we can run
 
                        // them immediately.
 
                        return ConnectorScheduling::Immediate;
 
                    }
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            RunResult::BranchAtSyncEnd => {
 
                // Branch is done, go through all of the ports that are not yet
 
                // assigned and map them to non-firing.
 
                for port_idx in 0..self.ports.num_ports() {
 
                    let port_mapping = self.ports.get_port_mut(branch.index.index, port_idx);
 
                    if !port_mapping.is_assigned {
 
                        port_mapping.mark_speculative(0);
 
                    }
 
@@ -941,97 +941,97 @@ impl ConnectorPDL {
 
                        // Valid if speculatively firing
 
                        port_mapping.num_times_fired == 1
 
                    }
 
                } else {
 
                    // Not yet assigned, do so now
 
                    true
 
                };
 

	
 
                if is_valid_put {
 
                    // Put in run results for thread to pick up and transfer to
 
                    // the correct connector inbox.
 
                    port_mapping.mark_definitive(branch.index, 1);
 
                    let message = DataMessage{
 
                        sending_port: local_port_id,
 
                        sender_prev_branch_id: BranchId::new_invalid(),
 
                        sender_cur_branch_id: branch.index,
 
                        message: value_group,
 
                    };
 

	
 
                    // If the message contains any ports then we release our
 
                    // ownership over them in this branch
 
                    debug_assert!(results.ports.is_empty());
 
                    find_ports_in_value_group(&message.message, &mut results.ports);
 
                    Self::release_ports_during_sync(&mut self.ports, branch, &results.ports).unwrap();
 
                    results.ports.clear();
 

	
 
                    results.outbox.push(MessageContents::Data(message));
 

	
 
                    let branch_index = branch.index;
 
                    Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, branch_index);
 
                    return ConnectorScheduling::Immediate
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in sync mode", run_result),
 
        }
 

	
 
        // Not immediately scheduling, so schedule again if there are more
 
        // branches to run
 
        if self.sync_active.is_empty() {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    /// Runs the connector in non-synchronous mode.
 
    pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, conn_ctx: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(!self.in_sync);
 
        debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty());
 
        debug_assert!(self.branches.len() == 1);
 

	
 
        let branch = &mut self.branches[0];
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = ConnectorRunContext{
 
            branch_index: branch.index.index,
 
            ports: &self.ports,
 
            ports_delta: &branch.ports_delta,
 
            scheduler: sched_ctx,
 
            prepared_channel: branch.prepared_channel.take(),
 
            received: &branch.received,
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => {
 
                // Need to wait until all children are terminated
 
                // TODO: Think about how to do this?
 
                branch.sync_state = SpeculativeState::Finished;
 
                return ConnectorScheduling::Exit;
 
            },
 
            RunResult::ComponentAtSyncStart => {
 
                // Prepare for sync execution and reschedule immediately
 
                self.in_sync = true;
 
                let first_sync_branch = Branch::new_sync_branching_from(1, branch);
 
                let first_sync_branch_id = first_sync_branch.index;
 
                self.ports.prepare_sync_branch(0, 1);
 
                self.branches.push(first_sync_branch);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch_id);
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                // Construction of a new component. Find all references to ports
 
                // inside of the arguments
 
                debug_assert!(results.ports.is_empty());
 
                find_ports_in_value_group(&arguments, &mut results.ports);
 

	
 
                if !results.ports.is_empty() {
 
                    // Ports changing ownership
 
                    if let Err(_) = Self::release_ports_during_non_sync(&mut self.ports, branch, &results.ports) {
 
                        todo!("fatal error handling");
 
                    }
 
                }
 

	
src/runtime2/native.rs
Show inline comments
 
use std::collections::VecDeque;
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::sync::atomic::Ordering;
 

	
 
use crate::protocol::ComponentCreationError;
 
use crate::protocol::eval::ValueGroup;
 
use crate::ProtocolDescription;
 
use crate::runtime2::scheduler::ComponentCtxFancy;
 

	
 
use super::{ConnectorKey, ConnectorId, RuntimeInner, ConnectorCtx};
 
use super::scheduler::SchedulerCtx;
 
use super::port::{Port, PortIdLocal, Channel, PortKind};
 
use super::connector::{Branch, ConnectorScheduling, RunDeltaState, ConnectorPDL};
 
use super::connector::find_ports_in_value_group;
 
use super::inbox::{Message, MessageContents};
 

	
 
/// Generic connector interface from the scheduler's point of view.
 
pub(crate) trait Connector {
 
    /// Handle a new message (preprocessed by the scheduler). You probably only
 
    /// want to handle `Data`, `Sync`, and `Solution` messages. The others are
 
    /// intended for the scheduler itself.
 
    fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState);
 

	
 
    /// Should run the connector's behaviour up until the next blocking point.
 
    fn run(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
}
 

	
 
type SyncDone = Arc<(Mutex<bool>, Condvar)>;
 
type JobQueue = Arc<Mutex<VecDeque<ApplicationJob>>>;
 

	
 
enum ApplicationJob {
 
    NewChannel((Port, Port)),
 
    NewConnector(ConnectorPDL),
 
    Shutdown,
 
}
 

	
 
/// The connector which an application can directly interface with. Once may set
 
/// up the next synchronous round, and retrieve the data afterwards.
 
pub struct ConnectorApplication {
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
}
 

	
 
impl ConnectorApplication {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>) -> (Self, ApplicationInterface) {
 
        let sync_done = Arc::new(( Mutex::new(false), Condvar::new() ));
 
        let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32)));
 

	
 
        let connector = ConnectorApplication { sync_done: sync_done.clone(), job_queue: job_queue.clone() };
 
        let interface = ApplicationInterface::new(sync_done, job_queue, runtime);
 

	
 
        return (connector, interface);
 
    }
 
}
 

	
 
impl Connector for ConnectorApplication {
 
    fn handle_message(&mut self, message: Message, _ctx: &ConnectorCtx, _delta_state: &mut RunDeltaState) {
 
        use MessageContents as MC;
 

	
 
        match message.contents {
 
            MC::Data(_) => unreachable!("data message in API connector"),
 
            MC::Sync(_) | MC::RequestCommit(_) | MC::ConfirmCommit(_) => {
 
                // Handling sync in API
 
            },
 
            MC::Control(_) => {},
 
            MC::Ping => {},
 
        }
 
    }
 

	
 
    fn run(&mut self, _sched_ctx: SchedulerCtx, _conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
    fn run(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, _conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop_front() {
 
            match job {
 
                ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
 
                    println!("DEBUG: API adopting ports");
 
                    delta_state.new_ports.reserve(2);
 
                    delta_state.new_ports.push(endpoint_a);
 
                    delta_state.new_ports.push(endpoint_b);
 
                }
 
                ApplicationJob::NewConnector(connector) => {
 
                    println!("DEBUG: API creating connector");
 
                    delta_state.new_connectors.push(connector);
 
                },
 
                ApplicationJob::Shutdown => {
 
                    debug_assert!(queue.is_empty());
 
                    return ConnectorScheduling::Exit;
 
                }
 
            }
 
        }
 

	
 
        return ConnectorScheduling::NotNow;
 
    }
 
}
 

	
 
/// The interface to a `ApplicationConnector`. This allows setting up the
 
/// interactions the `ApplicationConnector` performs within a synchronous round.
 
pub struct ApplicationInterface {
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
    runtime: Arc<RuntimeInner>,
 
    connector_id: ConnectorId,
 
    owned_ports: Vec<PortIdLocal>,
 
}
 

	
 
impl ApplicationInterface {
 
    fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner>) -> Self {
 
        return Self{
 
            sync_done, job_queue, runtime,
 
            connector_id: ConnectorId::new_invalid(),
 
            owned_ports: Vec::new(),
 
        }
 
    }
 

	
 
    /// Creates a new channel.
 
    pub fn create_channel(&mut self) -> Channel {
 
        let (getter_port, putter_port) = self.runtime.create_channel(self.connector_id);
 
        debug_assert_eq!(getter_port.kind, PortKind::Getter);
 
        let getter_id = getter_port.self_id;
src/runtime2/port.rs
Show inline comments
 
use super::ConnectorId;
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
 
pub struct PortIdLocal {
 
    pub index: u32,
 
}
 

	
 
impl PortIdLocal {
 
    pub fn new(id: u32) -> Self {
 
        Self{ index: id }
 
    }
 

	
 
    // TODO: Unsure about this, maybe remove, then also remove all struct
 
    //  instances where I call this
 
    pub fn new_invalid() -> Self {
 
        Self{ index: u32::MAX }
 
    }
 

	
 
    pub fn is_valid(&self) -> bool {
 
        return self.index != u32::MAX;
 
    }
 
}
 

	
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum PortState {
 
    Open,
 
    Closed,
 
}
 

	
 
/// Represents a port inside of the runtime. This is generally the local view of
 
/// a connector on its port, which may not be consistent with the rest of the
 
/// global system (e.g. its peer was moved to a new connector, or the peer might
 
/// have died in the meantime, so it is no longer usable).
 
#[derive(Clone)]
 
pub struct Port {
 
    pub self_id: PortIdLocal,
 
    pub peer_id: PortIdLocal,
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    pub peer_connector: ConnectorId, // might be temporarily inconsistent while peer port is sent around in non-sync phase
 
}
 

	
 

	
 

	
 
// TODO: Turn port ID into its own type
 
pub struct Channel {
 
    pub putter_id: PortIdLocal, // can put on it, so from the connector's point of view, this is an output
 
    pub getter_id: PortIdLocal, // vice versa: can get on it, so an input for the connector
 
}
 
\ No newline at end of file
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 
use crate::runtime2::ScheduledConnector;
 
use crate::runtime2::connector::{BranchId, ConnectorPDL};
 
use crate::runtime2::inbox::{DataMessage, PrivateInbox};
 

	
 
use super::{RuntimeInner, ConnectorId, ConnectorKey};
 
use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey};
 
use super::port::{Port, PortState, PortIdLocal};
 
use super::native::Connector;
 
use super::connector::{ConnectorScheduling, RunDeltaState};
 
use super::inbox::{Message, MessageContents, ControlMessageVariant, ControlMessage};
 

	
 
/// Contains fields that are mostly managed by the scheduler, but may be
 
/// accessed by the connector
 
pub(crate) struct ConnectorCtx {
 
    pub(crate) id: ConnectorId,
 
    pub(crate) ports: Vec<Port>,
 
}
 

	
 
impl ConnectorCtx {
 
    pub(crate) fn new() -> ConnectorCtx {
 
        Self{
 
            id: ConnectorId::new_invalid(),
 
            ports: Vec::new(),
 
        }
 
    }
 

	
 
    pub(crate) fn add_port(&mut self, port: Port) {
 
        debug_assert!(!self.ports.iter().any(|v| v.self_id == port.self_id));
 
        self.ports.push(port);
 
    }
 

	
 
    pub(crate) fn remove_port(&mut self, id: PortIdLocal) -> Port {
 
        let index = self.port_id_to_index(id);
 
        return self.ports.remove(index);
 
    }
 

	
 
    pub(crate) fn get_port(&self, id: PortIdLocal) -> &Port {
 
        let index = self.port_id_to_index(id);
 
        return &self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_mut(&mut self, id: PortIdLocal) -> &mut Port {
 
        let index = self.port_id_to_index(id);
 
        return &mut self.ports[index];
 
    }
 

	
 
    fn port_id_to_index(&self, id: PortIdLocal) -> usize {
 
        for (idx, port) in self.ports.iter().enumerate() {
 
            if port.self_id == id {
 
                return idx;
 
            }
 
        }
 

	
 
        panic!("port {:?}, not owned by connector", id);
 
@@ -321,96 +322,249 @@ impl Scheduler {
 
                    );
 

	
 
                    self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message));
 
                    self.runtime.send_message(port.peer_connector, reroute_message);
 
                }
 

	
 
                // Schedule new connector to run
 
                self.runtime.push_work(new_key);
 
            }
 
        }
 

	
 
        debug_assert!(delta_state.outbox.is_empty());
 
        debug_assert!(delta_state.new_ports.is_empty());
 
        debug_assert!(delta_state.new_connectors.is_empty());
 
    }
 

	
 
    fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) {
 
        debug_assert_eq!(connector_key.index, connector.context.id.0);
 
        debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false);
 

	
 
        // This is the running connector, and only the running connector may
 
        // decide it wants to sleep again.
 
        connector.public.sleeping.store(true, Ordering::Release);
 

	
 
        // But do to reordering we might have received messages from peers who
 
        // did not consider us sleeping. If so, then we wake ourselves again.
 
        if !connector.public.inbox.is_empty() {
 
            // Try to wake ourselves up
 
            let should_wake_up_again = connector.public.sleeping
 
                .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
                .is_ok();
 

	
 
            if should_wake_up_again {
 
                self.runtime.push_work(connector_key)
 
            }
 
        }
 
    }
 

	
 
    // TODO: Remove, this is debugging stuff
 
    fn debug(&self, message: &str) {
 
        println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
    }
 

	
 
    fn debug_conn(&self, conn: ConnectorId, message: &str) {
 
        println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message);
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ComponentCtx
 
// -----------------------------------------------------------------------------
 

	
 
enum ComponentStateChange {
 
    CreatedComponent(ConnectorPDL),
 
    CreatedPort(Port),
 
    ChangedPort(ComponentPortChange),
 
}
 

	
 
#[derive(Clone)]
 
pub(crate) enum ComponentPortChange {
 
    Acquired(Port),
 
    Released(Port),
 
}
 

	
 
struct InboxMessage {
 
    target_port: PortIdLocal,
 
    data: DataMessage,
 
}
 

	
 
/// The component context (better name may be invented). This was created
 
/// because part of the component's state is managed by the scheduler, and part
 
/// of it by the component itself. When the component starts a sync block or
 
/// exits a sync block the partially managed state by both component and
 
/// scheduler need to be exchanged.
 
pub(crate) struct ComponentCtxFancy {
 
    // Mostly managed by the scheduler
 
    id: ConnectorId,
 
    ports: Vec<Port>,
 
    inbox_messages: Vec<InboxMessage>,
 
    inbox_len_read: usize,
 
    // Submitted by the component
 
    is_in_sync: bool,
 
    changed_in_sync: bool,
 
    outbox: Vec<MessageContents>,
 
    state_changes: Vec<ComponentStateChange>
 
}
 

	
 
impl ComponentCtxFancy {
 
    /// Notify the runtime that the component has created a new component. May
 
    /// only be called outside of a sync block.
 
    pub(crate) fn push_component(&mut self, component: ConnectorPDL) {
 
        debug_assert!(!self.is_in_sync);
 
        self.state_changes.push(ComponentStateChange::CreatedComponent(component));
 
    }
 

	
 
    /// Notify the runtime that the component has created a new port. May only
 
    /// be called outside of a sync block (for ports received during a sync
 
    /// block, pass them when calling `notify_sync_end`).
 
    pub(crate) fn push_port(&mut self, port: Port) {
 
        debug_assert!(!self.is_in_sync);
 
        self.state_changes.push(ComponentStateChange::CreatedPort(port))
 
    }
 

	
 
    /// Notify that component will enter a sync block.
 
    pub(crate) fn notify_sync_start(&mut self) -> &[Port] {
 
        debug_assert!(!self.is_in_sync);
 

	
 
        self.is_in_sync = true;
 
        self.changed_in_sync = true;
 
        return &self.ports
 
    }
 

	
 
    /// Submit a message for the scheduler to send to the appropriate receiver.
 
    /// May only be called inside of a sync block.
 
    pub(crate) fn submit_message(&mut self, contents: MessageContents) {
 
        debug_assert!(self.is_in_sync);
 
        self.outbox.push(contents);
 
    }
 

	
 
    /// Notify that component just finished a sync block.
 
    pub(crate) fn notify_sync_end(&mut self, changed_ports: &[ComponentPortChange]) {
 
        debug_assert!(self.is_in_sync);
 

	
 
        self.is_in_sync = false;
 
        self.changed_in_sync = true;
 

	
 
        self.state_changes.reserve(changed_ports.len());
 
        for changed_port in changed_ports {
 
            self.state_changes.push(ComponentStateChange::ChangedPort(changed_port.clone()));
 
        }
 
    }
 

	
 
    /// Inserts message into inbox. Generally only called by scheduler.
 
    pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, data: DataMessage) {
 
        debug_assert!(!self.inbox_messages.iter().any(|v| {
 
            v.target_port == target_port &&
 
                v.data.sender_prev_branch_id == data.sender_prev_branch_id &&
 
                v.data.sender_cur_branch_id == data.sender_cur_branch_id
 
        }));
 

	
 
        self.inbox_messages.push(InboxMessage{ target_port, data })
 
    }
 

	
 
    /// Retrieves messages matching a particular port and branch id. But only
 
    /// those messages that have been previously received with
 
    /// `read_next_message`.
 
    pub(crate) fn get_read_messages(&self, match_port_id: PortIdLocal, match_prev_branch_id: BranchId) -> MessagesIter {
 
        return MessageIter {
 
            messages: &self.inbox_messages,
 
            next_index: 0,
 
            max_index: self.inbox_len_read,
 
            match_port_id, match_prev_branch_id
 
        };
 
    }
 

	
 
    /// Retrieves the next unread message from the inbox `None` if there are no
 
    /// (new) messages to read.
 
    pub(crate) fn read_next_message(&mut self) -> Option<(&PortIdLocal, &DataMessage)> {
 
        if self.inbox_len_read == self.inbox_messages.len() {
 
            return None;
 
        }
 

	
 
        let message = &self.inbox_messages[self.inbox_len_read];
 
        self.inbox_len_read += 1;
 
        return Some((&message.target_port, &message.data))
 
    }
 
}
 

	
 
pub(crate) struct MessagesIter<'a> {
 
    messages: &'a [InboxMessage],
 
    next_index: usize,
 
    max_index: usize,
 
    match_port_id: PortIdLocal,
 
    match_prev_branch_id: BranchId,
 
}
 

	
 
impl Iterator for MessagesIter {
 
    type Item = DataMessage;
 

	
 
    fn next(&mut self) -> Option<&Self::Item> {
 
        // Loop until match is found or at end of messages
 
        while self.next_index < self.max_index {
 
            let message = &self.messages[self.next_index];
 
            if message.target_port == self.match_port_id && message.data.sender_prev_branch_id == self.match_prev_branch_id {
 
                // Found a match
 
                break;
 
            }
 

	
 
            self.next_index += 1;
 
        }
 

	
 
        if self.next_index == self.max_index {
 
            return None;
 
        }
 

	
 
        let message = &self.messages[self.next_index];
 
        self.next_index += 1;
 
        return Some(&message.data);
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Control messages
 
// -----------------------------------------------------------------------------
 

	
 
struct ControlEntry {
 
    id: u32,
 
    variant: ControlVariant,
 
}
 

	
 
enum ControlVariant {
 
    ChangedPort(ControlChangedPort),
 
    ClosedChannel(ControlClosedChannel),
 
}
 

	
 
struct ControlChangedPort {
 
    target_port: PortIdLocal,       // if send to this port, then reroute
 
    source_connector: ConnectorId,  // connector we expect messages from
 
    target_connector: ConnectorId,  // connector we need to reroute to
 
}
 

	
 
struct ControlClosedChannel {
 
    source_port: PortIdLocal,
 
    target_port: PortIdLocal,
 
}
 

	
 
pub(crate) struct ControlMessageHandler {
 
    id_counter: u32,
 
    active: Vec<ControlEntry>,
 
}
 

	
 
impl ControlMessageHandler {
 
    pub fn new() -> Self {
 
        ControlMessageHandler {
 
            id_counter: 0,
 
            active: Vec::new(),
 
        }
 
    }
 

	
 
    /// Prepares a message indicating that a channel has closed, we keep a local
 
    /// entry to match against the (hopefully) returned `Ack` message.
 
    pub fn prepare_closing_channel(
 
        &mut self, self_port_id: PortIdLocal, peer_port_id: PortIdLocal,
 
        self_connector_id: ConnectorId
 
    ) -> Message {
 
        let id = self.take_id();
 

	
 
        self.active.push(ControlEntry{
 
            id,
0 comments (0 inline, 0 general)