Changeset - 154e5e08b93a
[Not reviewed]
1 8 0
MH - 4 years ago 2021-11-01 12:45:53
contact@maxhenger.nl
rewrite of context structure to fix data-resend bug
9 files changed with 517 insertions and 657 deletions:
0 comments (0 inline, 0 general)
src/collections/mod.rs
Show inline comments
 
mod string_pool;
 
mod scoped_buffer;
 
mod sets;
 
mod mpmc_queue;
 
mod raw_vec;
 

	
 
// TODO: Finish this later, use alloc::alloc and alloc::Layout
 
@@ -10,5 +9,4 @@ mod raw_vec;
 
pub(crate) use string_pool::{StringPool, StringRef};
 
pub(crate) use scoped_buffer::{ScopedBuffer, ScopedSection};
 
pub(crate) use sets::DequeSet;
 
pub(crate) use mpmc_queue::MpmcQueue;
 
pub(crate) use raw_vec::RawVec;
 
\ No newline at end of file
src/collections/mpmc_queue.rs
Show inline comments
 
deleted file
src/runtime2/connector.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::sync::atomic::AtomicBool;
 

	
 
use crate::{PortId, ProtocolDescription};
 
use crate::PortId;
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::scheduler::{ComponentCtxFancy, Scheduler};
 

	
 
use super::ConnectorId;
 
use super::native::Connector;
 
use super::scheduler::{SchedulerCtx, ConnectorCtx};
 
use super::scheduler::{
 
    SchedulerCtx, ComponentCtxFancy, ComponentPortChange,
 
    ReceivedMessage
 
};
 
use super::inbox::{
 
    PrivateInbox, PublicInbox,
 
    DataMessage, SyncMessage, SolutionMessage, Message, MessageContents,
 
    PublicInbox,
 
    DataMessage, SyncMessage, SolutionMessage, MessageContents,
 
    SyncBranchConstraint, SyncConnectorSolution
 
};
 
use super::port::{Port, PortKind, PortIdLocal};
 
use super::port::{PortKind, PortIdLocal};
 

	
 
/// Represents the identifier of a branch (the index within its container). An
 
/// ID of `0` generally means "no branch" (e.g. no parent, or a port did not
 
@@ -65,7 +67,7 @@ pub(crate) struct Branch {
 
    next_branch_in_queue: Option<u32>,
 
    // Message/port state
 
    received: HashMap<PortIdLocal, DataMessage>, // TODO: @temporary, remove together with fires()
 
    ports_delta: Vec<PortOwnershipDelta>,
 
    ports_delta: Vec<ComponentPortChange>,
 
}
 

	
 
impl Branch {
 
@@ -154,12 +156,6 @@ impl PortAssignment {
 
    }
 
}
 

	
 
#[derive(Clone)]
 
struct PortOwnershipDelta {
 
    acquired: bool, // if false, then released ownership
 
    port_id: PortIdLocal,
 
}
 

	
 
#[derive(Debug)]
 
enum PortOwnershipError {
 
    UsedInInteraction(PortIdLocal),
 
@@ -331,8 +327,6 @@ impl ConnectorPublic {
 
// TODO: Maybe prevent false sharing by aligning `public` to next cache line.
 
// TODO: Do this outside of the connector, create a wrapping struct
 
pub(crate) struct ConnectorPDL {
 
    // State and properties of connector itself
 
    in_sync: bool,
 
    // Branch management
 
    branches: Vec<Branch>, // first branch is always non-speculative one
 
    sync_active: BranchQueue,
 
@@ -342,7 +336,6 @@ pub(crate) struct ConnectorPDL {
 
    cur_round: u32,
 
    // Port/message management
 
    pub committed_to: Option<(ConnectorId, u64)>,
 
    pub inbox: PrivateInbox,
 
    pub ports: ConnectorPorts,
 
}
 

	
 
@@ -350,7 +343,7 @@ pub(crate) struct ConnectorPDL {
 
struct ConnectorRunContext<'a> {
 
    branch_index: u32,
 
    ports: &'a ConnectorPorts,
 
    ports_delta: &'a Vec<PortOwnershipDelta>,
 
    ports_delta: &'a Vec<ComponentPortChange>,
 
    received: &'a HashMap<PortIdLocal, DataMessage>,
 
    scheduler: SchedulerCtx<'a>,
 
    prepared_channel: Option<(Value, Value)>,
 
@@ -358,7 +351,7 @@ struct ConnectorRunContext<'a> {
 

	
 
impl<'a> RunContext for ConnectorRunContext<'a> {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        if self.ports_delta.iter().any(|v| v.port_id.index == port.0.u32_suffix) {
 
        if self.ports_delta.iter().any(|v| v.port.self_id.index == port.0.u32_suffix) {
 
            // Either acquired or released, must be silent
 
            return false;
 
        }
 
@@ -378,7 +371,7 @@ impl<'a> RunContext for ConnectorRunContext<'a> {
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        if self.ports_delta.iter().any(|v| v.port_id == port_id) {
 
        if self.ports_delta.iter().any(|v| v.port.self_id == port_id) {
 
            return None
 
        }
 

	
 
@@ -398,53 +391,10 @@ impl<'a> RunContext for ConnectorRunContext<'a> {
 
}
 

	
 
impl Connector for ConnectorPDL {
 
    fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        use MessageContents as MC;
 

	
 
        match message.contents {
 
            MC::Data(content) => self.handle_data_message(message.receiving_port, content),
 
            MC::Sync(content) => self.handle_sync_message(content, ctx, delta_state),
 
            MC::RequestCommit(content) => self.handle_request_commit_message(content, ctx, delta_state),
 
            MC::ConfirmCommit(content) => self.handle_confirm_commit_message(content, ctx, delta_state),
 
            MC::Control(_) | MC::Ping => {},
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        if self.in_sync {
 
            // Check for new messages we haven't seen before. If any of the
 
            // pending branches can accept the message, do so.
 
            while let Some((target_port_id, message)) = comp_ctx.read_next_message() {
 
                let mut branch_idx = self.sync_pending_get.first;
 
                while branch_idx != 0 {
 
                    let branch = &self.branches[branch_idx as usize];
 
                    let next_branch_idx = branch.next_branch_in_queue.unwrap_or(0);
 

	
 
                    let target_port_index = self.ports.get_port_index(*target_port_id).unwrap();
 
                    let port_mapping = self.ports.get_port(branch_idx, target_port_index);
 

	
 
                    if branch.sync_state == SpeculativeState::HaltedAtBranchPoint &&
 
                        branch.halted_at_port == *target_port_id &&
 
                        port_mapping.last_registered_branch_id == message.sender_prev_branch_id {
 
                        // Branch may accept this mesage, so create a fork that
 
                        // contains this message in the inbox.
 
                        let new_branch_idx = self.branches.len() as u32;
 
                        let new_branch = Branch::new_sync_branching_from(new_branch_idx, branch);
 

	
 
                        self.ports.prepare_sync_branch(branch_idx, new_branch_idx);
 
                        let mapping = self.ports.get_port_mut(branch_idx, target_port_index);
 
                        mapping.last_registered_branch_id = message.sender_cur_branch_id;
 

	
 
                        let new_branch_id = BranchId::new(new_branch_idx);
 
                        self.branches.push(new_branch);
 
                        Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id)
 
                    }
 

	
 
                    branch_idx = next_branch_idx;
 
                }
 
            }
 

	
 
            let scheduling = self.run_in_speculative_mode(sched_ctx, comp_ctx, conn_ctx, delta_state);
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
 
        self.handle_new_messages(comp_ctx);
 
        if comp_ctx.is_in_sync() {
 
            let scheduling = self.run_in_speculative_mode(sched_ctx, comp_ctx);
 

	
 
            // When in speculative mode we might have generated new sync
 
            // solutions, we need to turn them into proposed solutions here.
 
@@ -466,9 +416,9 @@ impl Connector for ConnectorPDL {
 

	
 
                    // Turn local solution into a message and send it along
 
                    // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed
 
                    let solution_message = self.generate_initial_solution_for_branch(branch_id, conn_ctx);
 
                    let solution_message = self.generate_initial_solution_for_branch(branch_id, comp_ctx);
 
                    if let Some(valid_solution) = solution_message {
 
                        self.submit_sync_solution(valid_solution, conn_ctx, delta_state);
 
                        self.submit_sync_solution(valid_solution, comp_ctx);
 
                    } else {
 
                        // Branch is actually invalid, but we only just figured
 
                        // it out. We need to mark it as invalid to prevent
 
@@ -493,7 +443,7 @@ impl Connector for ConnectorPDL {
 

	
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx, conn_ctx, delta_state);
 
            let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx);
 
            return scheduling;
 
        }
 
    }
 
@@ -505,7 +455,6 @@ impl ConnectorPDL {
 
    /// hence is in a non-sync state.
 
    pub fn new(initial_branch: Branch, owned_ports: Vec<PortIdLocal>) -> Self {
 
        Self{
 
            in_sync: false,
 
            branches: vec![initial_branch],
 
            sync_active: BranchQueue::new(),
 
            sync_pending_get: BranchQueue::new(),
 
@@ -513,28 +462,71 @@ impl ConnectorPDL {
 
            sync_finished_last_handled: 0, // none at all
 
            cur_round: 0,
 
            committed_to: None,
 
            inbox: PrivateInbox::new(),
 
            ports: ConnectorPorts::new(owned_ports),
 
        }
 
    }
 

	
 
    pub fn is_in_sync_mode(&self) -> bool {
 
        return self.in_sync;
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling connector messages
 
    // -------------------------------------------------------------------------
 

	
 
    pub fn handle_data_message(&mut self, target_port: PortIdLocal, message: DataMessage) {
 
        // self.inbox.insert_message(target_port, message);
 
    pub fn handle_new_messages(&mut self, comp_ctx: &mut ComponentCtxFancy) {
 
        while let Some(message) = comp_ctx.read_next_message() {
 
            match message {
 
                ReceivedMessage::Data((target_port_id, contents)) => {
 
                    self.handle_data_message(target_port_id, &contents);
 
                },
 
                ReceivedMessage::Sync(contents) => {
 
                    self.handle_sync_message(contents, comp_ctx);
 
                },
 
                ReceivedMessage::RequestCommit(contents) => {
 
                    self.handle_request_commit_message(contents, comp_ctx);
 
                },
 
                ReceivedMessage::ConfirmCommit(contents) => {
 
                    self.handle_confirm_commit_message(contents, comp_ctx);
 
                },
 
            }
 
        }
 
    }
 

	
 
    pub fn handle_data_message(&mut self, target_port_id: PortIdLocal, message: &DataMessage) {
 
        // Go through all branches that are waiting for a message
 
        let mut branch_idx = self.sync_pending_get.first;
 
        while branch_idx != 0 {
 
            let branch = &self.branches[branch_idx as usize];
 
            let next_branch_idx = branch.next_branch_in_queue.unwrap_or(0);
 

	
 
            let target_port_index = self.ports.get_port_index(target_port_id).unwrap();
 
            let port_mapping = self.ports.get_port(branch_idx, target_port_index);
 

	
 
            // Check if the branch may accept the message
 
            if branch.sync_state == SpeculativeState::HaltedAtBranchPoint &&
 
                branch.halted_at_port == target_port_id &&
 
                port_mapping.last_registered_branch_id == message.sender_prev_branch_id
 
            {
 
                // Branch can accept. So fork it, and let the fork accept the
 
                // message. The original branch stays waiting for new messages.
 
                let new_branch_idx = self.branches.len() as u32;
 
                let new_branch = Branch::new_sync_branching_from(new_branch_idx, branch);
 

	
 
                self.ports.prepare_sync_branch(branch_idx, new_branch_idx);
 
                let mapping = self.ports.get_port_mut(branch_idx, target_port_index);
 
                mapping.last_registered_branch_id = message.sender_cur_branch_id;
 

	
 
                let new_branch_id = BranchId::new(new_branch_idx);
 
                self.branches.push(new_branch);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id)
 
            }
 

	
 
            branch_idx = next_branch_idx;
 
        }
 
    }
 

	
 
    /// Accepts a synchronous message and combines it with the locally stored
 
    /// solution(s). Then queue new `Sync`/`Solution` messages when appropriate.
 
    pub fn handle_sync_message(&mut self, message: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) {
 
        debug_assert!(!message.to_visit.contains(&ctx.id)); // own ID already removed
 
        debug_assert!(message.constraints.iter().any(|v| v.connector_id == ctx.id)); // we have constraints
 
    pub fn handle_sync_message(&mut self, message: SyncMessage, comp_ctx: &mut ComponentCtxFancy) {
 
        debug_assert!(!message.to_visit.contains(&comp_ctx.id)); // own ID already removed
 
        debug_assert!(message.constraints.iter().any(|v| v.connector_id == comp_ctx.id)); // we have constraints
 

	
 
        // TODO: Optimize, use some kind of temp workspace vector
 
        let mut execution_path_branch_ids = Vec::new();
 
@@ -543,7 +535,7 @@ impl ConnectorPDL {
 
            // We have some solutions to match against
 
            let constraints_index = message.constraints
 
                .iter()
 
                .position(|v| v.connector_id == ctx.id)
 
                .position(|v| v.connector_id == comp_ctx.id)
 
                .unwrap();
 
            let constraints = &message.constraints[constraints_index].constraints;
 
            debug_assert!(!constraints.is_empty());
 
@@ -621,7 +613,7 @@ impl ConnectorPDL {
 
                // - replace constraints with a local solution
 
                new_solution.constraints.remove(constraints_index);
 
                new_solution.local_solutions.push(SyncConnectorSolution{
 
                    connector_id: ctx.id,
 
                    connector_id: comp_ctx.id,
 
                    terminating_branch_id: BranchId::new(branch_index),
 
                    execution_branch_ids: execution_path_branch_ids.clone(),
 
                    final_port_mapping: new_solution_mapping,
 
@@ -633,7 +625,7 @@ impl ConnectorPDL {
 
                    let port_id = self.ports.get_port_id(port_index);
 

	
 
                    let (peer_connector_id, peer_port_id, peer_is_getter) = {
 
                        let port = ctx.get_port(port_id);
 
                        let port = comp_ctx.get_port_by_id(port_id).unwrap();
 
                        (port.peer_connector, port.peer_id, port.kind == PortKind::Putter)
 
                    };
 

	
 
@@ -658,7 +650,7 @@ impl ConnectorPDL {
 
                // If here, then the newly generated solution is completely
 
                // compatible.
 
                let next_branch = branch.next_branch_in_queue;
 
                self.submit_sync_solution(new_solution, ctx, results);
 
                self.submit_sync_solution(new_solution, comp_ctx);
 

	
 
                // Consider the next branch
 
                if branch_index == self.sync_finished_last_handled {
 
@@ -672,7 +664,7 @@ impl ConnectorPDL {
 
        }
 
    }
 

	
 
    fn handle_request_commit_message(&mut self, mut message: SolutionMessage, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
    fn handle_request_commit_message(&mut self, mut message: SolutionMessage, comp_ctx: &mut ComponentCtxFancy) {
 
        let should_propagate_message = match &self.committed_to {
 
            Some((previous_origin, previous_comparison)) => {
 
                // Already committed to something. So will commit to this if it
 
@@ -695,22 +687,22 @@ impl ConnectorPDL {
 
                // TODO: Use temporary workspace
 
                let mut to_visit = Vec::with_capacity(message.local_solutions.len() - 1);
 
                for (connector_id, _) in &message.local_solutions {
 
                    if *connector_id != ctx.id {
 
                    if *connector_id != comp_ctx.id {
 
                        to_visit.push(*connector_id);
 
                    }
 
                }
 

	
 
                message.to_visit = to_visit;
 
                self.handle_confirm_commit_message(message.clone(), ctx, delta_state);
 
                delta_state.outbox.push(MessageContents::ConfirmCommit(message));
 
                comp_ctx.submit_message(MessageContents::ConfirmCommit(message.clone()));
 
                self.handle_confirm_commit_message(message, comp_ctx);
 
            } else {
 
                // Not yet visited all of the connectors
 
                delta_state.outbox.push(MessageContents::RequestCommit(message));
 
                comp_ctx.submit_message(MessageContents::RequestCommit(message));
 
            }
 
        }
 
    }
 

	
 
    fn handle_confirm_commit_message(&mut self, message: SolutionMessage, ctx: &ConnectorCtx, _delta_state: &mut RunDeltaState) {
 
    fn handle_confirm_commit_message(&mut self, message: SolutionMessage, comp_ctx: &mut ComponentCtxFancy) {
 
        // Make sure this is the message we actually committed to. As long as
 
        // we're running on a single machine this is fine.
 
        // TODO: Take care of nefarious peers
 
@@ -722,16 +714,15 @@ impl ConnectorPDL {
 
        // Find the branch we're supposed to commit to
 
        let (_, branch_id) = message.local_solutions
 
            .iter()
 
            .find(|(id, _)| *id == ctx.id)
 
            .find(|(id, _)| *id == comp_ctx.id)
 
            .unwrap();
 
        let branch_id = *branch_id;
 

	
 
        // Commit to the branch. That is: move the solution branch to the first
 
        // of the connector's branches
 
        self.in_sync = false;
 
        self.branches.swap(0, branch_id.index as usize);
 
        self.branches.truncate(1); // TODO: Or drain and do not deallocate?
 
        let solution = &mut self.branches[0];
 
        let solution_branch = &mut self.branches[0];
 

	
 
        // Clear all of the other sync-related variables
 
        self.sync_active.clear();
 
@@ -741,18 +732,20 @@ impl ConnectorPDL {
 
        self.cur_round += 1;
 

	
 
        self.committed_to = None;
 
        self.inbox.clear();
 
        self.ports.commit_to_sync();
 

	
 
        // Add/remove any of the ports we lost during the sync phase
 
        for port_delta in &solution.ports_delta {
 
            if port_delta.acquired {
 
                self.ports.add_port(port_delta.port_id);
 
        // TODO: Probably might not need this with the port syncing
 
        for port_delta in &solution_branch.ports_delta {
 
            if port_delta.is_acquired {
 
                self.ports.add_port(port_delta.port.self_id);
 
            } else {
 
                self.ports.remove_port(port_delta.port_id);
 
                self.ports.remove_port(port_delta.port.self_id);
 
            }
 
        }
 
        solution.commit_to_sync();
 

	
 
        comp_ctx.notify_sync_end(&solution_branch.ports_delta);
 
        solution_branch.commit_to_sync();
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
@@ -764,8 +757,8 @@ impl ConnectorPDL {
 
    /// where it is the caller's responsibility to immediately take care of
 
    /// those changes. The return value indicates when (and if) the connector
 
    /// needs to be scheduled again.
 
    pub fn run_in_speculative_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(self.in_sync);
 
    pub fn run_in_speculative_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
 
        debug_assert!(comp_ctx.is_in_sync());
 

	
 
        if self.sync_active.is_empty() {
 
            return ConnectorScheduling::NotNow;
 
@@ -864,7 +857,7 @@ impl ConnectorPDL {
 

	
 
                    // But if some messages can be immediately applied, do so
 
                    // now.
 
                    let messages = comp_ctx.get_read_messages(local_port_id, port_mapping.last_registered_branch_id);
 
                    let messages = comp_ctx.get_read_data_messages(local_port_id, port_mapping.last_registered_branch_id);
 
                    let mut did_have_messages = false;
 

	
 
                    for message in messages {
 
@@ -884,10 +877,9 @@ impl ConnectorPDL {
 

	
 
                        // If the message contains any ports then they will now
 
                        // be owned by the new branch
 
                        debug_assert!(results.ports.is_empty());
 
                        find_ports_in_value_group(&message.message, &mut results.ports);
 
                        Self::acquire_ports_during_sync(&mut self.ports, &mut new_branch, &results.ports);
 
                        results.ports.clear();
 
                        let mut transferred_ports = Vec::new(); // TODO: Create workspace somewhere
 
                        find_ports_in_value_group(&message.message, &mut transferred_ports);
 
                        Self::acquire_ports_during_sync(&mut self.ports, &mut new_branch, &transferred_ports);
 

	
 
                        // Schedule the new branch
 
                        debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync);
 
@@ -959,12 +951,11 @@ impl ConnectorPDL {
 

	
 
                    // If the message contains any ports then we release our
 
                    // ownership over them in this branch
 
                    debug_assert!(results.ports.is_empty());
 
                    find_ports_in_value_group(&message.message, &mut results.ports);
 
                    Self::release_ports_during_sync(&mut self.ports, branch, &results.ports).unwrap();
 
                    results.ports.clear();
 
                    let mut transferred_ports = Vec::new(); // TODO: Put in some temp workspace
 
                    find_ports_in_value_group(&message.message, &mut transferred_ports);
 
                    Self::release_ports_during_sync(&mut self.ports, branch, &transferred_ports).unwrap();
 

	
 
                    results.outbox.push(MessageContents::Data(message));
 
                    comp_ctx.submit_message(MessageContents::Data(message));
 

	
 
                    let branch_index = branch.index;
 
                    Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, branch_index);
 
@@ -986,8 +977,8 @@ impl ConnectorPDL {
 
    }
 

	
 
    /// Runs the connector in non-synchronous mode.
 
    pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, conn_ctx: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(!self.in_sync);
 
    pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
 
        debug_assert!(!comp_ctx.is_in_sync());
 
        debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty());
 
        debug_assert!(self.branches.len() == 1);
 

	
 
@@ -1013,7 +1004,14 @@ impl ConnectorPDL {
 
            },
 
            RunResult::ComponentAtSyncStart => {
 
                // Prepare for sync execution and reschedule immediately
 
                self.in_sync = true;
 
                // TODO: Not sure about this. I want a clear synchronization
 
                //  point between scheduler/component view on the ports. But is
 
                //  this the way to do it?
 
                let current_ports = comp_ctx.notify_sync_start();
 
                for port in current_ports {
 
                    debug_assert!(self.ports.get_port_index(port.self_id).is_some());
 
                }
 

	
 
                let first_sync_branch = Branch::new_sync_branching_from(1, branch);
 
                let first_sync_branch_id = first_sync_branch.index;
 
                self.ports.prepare_sync_branch(0, 1);
 
@@ -1025,12 +1023,12 @@ impl ConnectorPDL {
 
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                // Construction of a new component. Find all references to ports
 
                // inside of the arguments
 
                debug_assert!(results.ports.is_empty());
 
                find_ports_in_value_group(&arguments, &mut results.ports);
 
                let mut transferred_ports = Vec::new();
 
                find_ports_in_value_group(&arguments, &mut transferred_ports);
 

	
 
                if !results.ports.is_empty() {
 
                if !transferred_ports.is_empty() {
 
                    // Ports changing ownership
 
                    if let Err(_) = Self::release_ports_during_non_sync(&mut self.ports, branch, &results.ports) {
 
                    if let Err(_) = Self::release_ports_during_non_sync(&mut self.ports, branch, &transferred_ports) {
 
                        todo!("fatal error handling");
 
                    }
 
                }
 
@@ -1043,25 +1041,25 @@ impl ConnectorPDL {
 
                        definition_id, monomorph_idx, arguments
 
                    )
 
                };
 
                let new_connector_ports = results.ports.clone(); // TODO: Do something with this
 

	
 
                let new_connector_branch = Branch::new_initial_branch(new_connector_state);
 
                let new_connector = ConnectorPDL::new(new_connector_branch, new_connector_ports);
 
                let new_connector = ConnectorPDL::new(new_connector_branch, transferred_ports);
 

	
 
                results.new_connectors.push(new_connector);
 
                comp_ctx.push_component(new_connector);
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
            RunResult::NewChannel => {
 
                // Need to prepare a new channel
 
                let (getter, putter) = sched_ctx.runtime.create_channel(conn_ctx.id);
 
                let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id);
 
                debug_assert_eq!(getter.kind, PortKind::Getter);
 
                branch.prepared_channel = Some((
 
                    Value::Input(PortId::new(putter.self_id.index)),
 
                    Value::Output(PortId::new(getter.self_id.index))
 
                ));
 

	
 
                results.new_ports.push(putter);
 
                results.new_ports.push(getter);
 
                comp_ctx.push_port(putter);
 
                comp_ctx.push_port(getter);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
@@ -1201,26 +1199,27 @@ impl ConnectorPDL {
 
                    }
 

	
 
                    for delta in &branch.ports_delta {
 
                        if delta.port_id == *port_id {
 
                        if delta.port.self_id == *port_id {
 
                            // We cannot have acquired this port, because the
 
                            // call to `ports.get_port_index` returned an index.
 
                            debug_assert!(!delta.acquired);
 
                            debug_assert!(!delta.is_acquired);
 
                            return Err(PortOwnershipError::AlreadyGivenAway(*port_id));
 
                        }
 
                    }
 

	
 
                    branch.ports_delta.push(PortOwnershipDelta{
 
                        acquired: false,
 
                        port_id: *port_id,
 
                    });
 
                    // TODO: Obtain port description
 
                    // branch.ports_delta.push(ComponentPortChange{
 
                    //     is_acquired: false,
 
                    //     port_id: *port_id,
 
                    // });
 
                },
 
                None => {
 
                    // Not in port mapping, so we must have acquired it before,
 
                    // remove the acquirement.
 
                    let mut to_delete_index: isize = -1;
 
                    for (delta_idx, delta) in branch.ports_delta.iter().enumerate() {
 
                        if delta.port_id == *port_id {
 
                            debug_assert!(delta.acquired);
 
                        if delta.port.self_id == *port_id {
 
                            debug_assert!(delta.is_acquired);
 
                            to_delete_index = delta_idx as isize;
 
                            break;
 
                        }
 
@@ -1246,8 +1245,8 @@ impl ConnectorPDL {
 

	
 
        'port_loop: for port_id in port_ids {
 
            for (delta_idx, delta) in branch.ports_delta.iter().enumerate() {
 
                if delta.port_id == *port_id {
 
                    if delta.acquired {
 
                if delta.port.self_id == *port_id {
 
                    if delta.is_acquired {
 
                        // Somehow already received this port.
 
                        // TODO: @security
 
                        todo!("take care of nefarious peers");
 
@@ -1261,10 +1260,11 @@ impl ConnectorPDL {
 
            }
 

	
 
            // If here then we can safely acquire the new port
 
            branch.ports_delta.push(PortOwnershipDelta{
 
                acquired: true,
 
                port_id: *port_id,
 
            });
 
            // TODO: Retrieve port infor
 
            // branch.ports_delta.push(PortOwnershipDelta{
 
            //     acquired: true,
 
            //     port_id: *port_id,
 
            // });
 
        }
 

	
 
        return Ok(())
 
@@ -1276,8 +1276,8 @@ impl ConnectorPDL {
 
    /// Generates the initial solution for a finished sync branch. If initial
 
    /// local solution is valid, then the appropriate message is returned.
 
    /// Otherwise the initial solution is inconsistent.
 
    fn generate_initial_solution_for_branch(&self, branch_id: BranchId, ctx: &ConnectorCtx) -> Option<SyncMessage> {
 
        // Retrieve branchg
 
    fn generate_initial_solution_for_branch(&self, branch_id: BranchId, comp_ctx: &ComponentCtxFancy) -> Option<SyncMessage> {
 
        // Retrieve branchh
 
        debug_assert!(branch_id.is_valid()); // because we're supposed to be in sync mode
 
        let branch = &self.branches[branch_id.index as usize];
 
        debug_assert_eq!(branch.sync_state, SpeculativeState::ReachedSyncEnd);
 
@@ -1301,7 +1301,7 @@ impl ConnectorPDL {
 
        }
 

	
 
        let initial_local_solution = SyncConnectorSolution{
 
            connector_id: ctx.id,
 
            connector_id: comp_ctx.id,
 
            terminating_branch_id: branch_id,
 
            execution_branch_ids: all_branch_ids,
 
            final_port_mapping: initial_solution_port_mapping,
 
@@ -1317,12 +1317,12 @@ impl ConnectorPDL {
 
            // sender and one for the receiver, ensuring it was not used.
 
            // TODO: This will fail if a port is passed around multiple times.
 
            //  maybe a special "passed along" entry in `ports_delta`.
 
            if !sync_message.check_constraint(ctx.id, SyncBranchConstraint::SilentPort(port_delta.port_id)).unwrap() {
 
            if !sync_message.check_constraint(comp_ctx.id, SyncBranchConstraint::SilentPort(port_delta.port.self_id)).unwrap() {
 
                return None;
 
            }
 

	
 
            // Might need to check if we own the other side of the channel
 
            let port = ctx.get_port(port_delta.port_id);
 
            let port = comp_ctx.get_port_by_id(port_delta.port.self_id).unwrap();
 
            if !sync_message.add_or_check_constraint(port.peer_connector, SyncBranchConstraint::SilentPort(port.peer_id)).unwrap() {
 
                return None;
 
            }
 
@@ -1332,7 +1332,7 @@ impl ConnectorPDL {
 
        for port_index in 0..self.ports.num_ports() {
 
            let port_id = self.ports.get_port_id(port_index);
 
            let port_mapping = self.ports.get_port(branch_id.index, port_index);
 
            let port = ctx.get_port(port_id);
 
            let port = comp_ctx.get_port_by_id(port_id).unwrap();
 

	
 
            let constraint = if port_mapping.is_assigned {
 
                if port.kind == PortKind::Getter {
 
@@ -1352,7 +1352,7 @@ impl ConnectorPDL {
 
        return Some(sync_message);
 
    }
 

	
 
    fn submit_sync_solution(&mut self, partial_solution: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) {
 
    fn submit_sync_solution(&mut self, partial_solution: SyncMessage, comp_ctx: &mut ComponentCtxFancy) {
 
        if partial_solution.to_visit.is_empty() {
 
            // Solution is completely consistent. So ask everyone to commit
 
            // TODO: Maybe another package for random?
 
@@ -1366,24 +1366,24 @@ impl ConnectorPDL {
 

	
 
            let mut full_solution = SolutionMessage{
 
                comparison_number,
 
                connector_origin: ctx.id,
 
                connector_origin: comp_ctx.id,
 
                local_solutions: Vec::with_capacity(num_local),
 
                to_visit: Vec::with_capacity(num_local - 1),
 
            };
 

	
 
            for local_solution in &partial_solution.local_solutions {
 
                full_solution.local_solutions.push((local_solution.connector_id, local_solution.terminating_branch_id));
 
                if local_solution.connector_id != ctx.id {
 
                if local_solution.connector_id != comp_ctx.id {
 
                    full_solution.to_visit.push(local_solution.connector_id);
 
                }
 
            }
 

	
 
            debug_assert!(self.committed_to.is_none());
 
            self.committed_to = Some((full_solution.connector_origin, full_solution.comparison_number));
 
            results.outbox.push(MessageContents::RequestCommit(full_solution));
 
            comp_ctx.submit_message(MessageContents::RequestCommit(full_solution));
 
        } else {
 
            // Still have connectors to visit
 
            results.outbox.push(MessageContents::Sync(partial_solution));
 
            comp_ctx.submit_message(MessageContents::Sync(partial_solution));
 
        }
 
    }
 

	
 
@@ -1401,33 +1401,6 @@ impl ConnectorPDL {
 
    }
 
}
 

	
 
/// A data structure passed to a connector whose code is being executed that is
 
/// used to queue up various state changes that have to be applied after
 
/// running, e.g. the messages the have to be transferred to other connectors.
 
// TODO: Come up with a better name
 
pub(crate) struct RunDeltaState {
 
    // Variables that allow the thread running the connector to pick up global
 
    // state changes and try to apply them.
 
    pub outbox: Vec<MessageContents>,
 
    pub new_connectors: Vec<ConnectorPDL>,
 
    pub new_ports: Vec<Port>,
 
    // Workspaces
 
    pub ports: Vec<PortIdLocal>,
 
}
 

	
 
impl RunDeltaState {
 
    /// Constructs a new `RunDeltaState` object with the default amount of
 
    /// reserved memory
 
    pub fn new() -> Self {
 
        RunDeltaState{
 
            outbox: Vec::with_capacity(64),
 
            new_connectors: Vec::new(),
 
            new_ports: Vec::new(),
 
            ports: Vec::with_capacity(64),
 
        }
 
    }
 
}
 

	
 
#[derive(Eq, PartialEq)]
 
pub(crate) enum ConnectorScheduling {
 
    Immediate,      // Run again, immediately
src/runtime2/inbox.rs
Show inline comments
 
@@ -239,107 +239,4 @@ impl PublicInbox {
 
        let lock = self.messages.lock().unwrap();
 
        return lock.is_empty();
 
    }
 
}
 

	
 
pub(crate) struct PrivateInbox {
 
    // "Normal" messages, intended for a PDL protocol. These need to stick
 
    // around during an entire sync-block (to handle `put`s for which the
 
    // corresponding `get`s have not yet been reached).
 
    messages: Vec<(PortIdLocal, DataMessage)>,
 
    len_read: usize,
 
}
 

	
 
impl PrivateInbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Vec::new(),
 
            len_read: 0,
 
        }
 
    }
 

	
 
    /// Will insert the message into the inbox. Only exception is when the tuple
 
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
 
    /// nothing is inserted..
 
    pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, message: DataMessage) {
 
        for (existing_target_port, existing) in self.messages.iter() {
 
            if existing.sender_prev_branch_id == message.sender_prev_branch_id &&
 
                    existing.sender_cur_branch_id == message.sender_cur_branch_id &&
 
                    *existing_target_port == target_port {
 
                // Message was already received
 
                return;
 
            }
 
        }
 

	
 
        self.messages.push((target_port, message));
 
    }
 

	
 
    /// Retrieves all previously read messages that satisfy the provided
 
    /// speculative conditions. Note that the inbox remains read-locked until
 
    /// the returned iterator is dropped. Should only be called by the
 
    /// inbox-reader (i.e. the thread executing a connector's PDL code).
 
    ///
 
    /// This function should only be used to check if already-received messages
 
    /// could be received by a newly encountered `get` call in a connector's
 
    /// PDL code.
 
    pub(crate) fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter {
 
        return InboxMessageIter {
 
            messages: &self.messages,
 
            next_index: 0,
 
            max_index: self.len_read,
 
            match_port_id: port_id,
 
            match_prev_branch_id: prev_branch_id,
 
        };
 
    }
 

	
 
    /// Retrieves the next unread message. Should only be called by the
 
    /// inbox-reader.
 
    pub(crate) fn next_message(&mut self) -> Option<(&PortIdLocal, &DataMessage)> {
 
        if self.len_read == self.messages.len() {
 
            return None;
 
        }
 

	
 
        let (target_port, message) = &self.messages[self.len_read];
 
        self.len_read += 1;
 
        return Some((target_port, message));
 
    }
 

	
 
    /// Simply empties the inbox
 
    pub(crate) fn clear(&mut self) {
 
        self.messages.clear();
 
        self.len_read = 0;
 
    }
 
}
 

	
 
/// Iterator over previously received messages in the inbox.
 
pub(crate) struct InboxMessageIter<'i> {
 
    messages: &'i Vec<(PortIdLocal, DataMessage)>,
 
    next_index: usize,
 
    max_index: usize,
 
    match_port_id: PortIdLocal,
 
    match_prev_branch_id: BranchId,
 
}
 

	
 
impl<'i> Iterator for InboxMessageIter<'i> {
 
    type Item = &'i DataMessage;
 

	
 
    fn next(&mut self) -> Option<Self::Item> {
 
        // Loop until match is found or at end of messages
 
        while self.next_index < self.max_index {
 
            let (target_port, cur_message) = &self.messages[self.next_index];
 
            if *target_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id {
 
                // Found a match
 
                break;
 
            }
 

	
 
            self.next_index += 1;
 
        }
 

	
 
        if self.next_index == self.max_index {
 
            return None;
 
        }
 

	
 
        let (_, message) = &self.messages[self.next_index];
 
        self.next_index += 1;
 
        return Some(message);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/mod.rs
Show inline comments
 
@@ -21,11 +21,11 @@ use crate::collections::RawVec;
 
use crate::ProtocolDescription;
 

	
 
use inbox::Message;
 
use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState};
 
use scheduler::{Scheduler, ConnectorCtx, ControlMessageHandler};
 
use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling};
 
use scheduler::{Scheduler, ControlMessageHandler};
 
use native::{Connector, ConnectorApplication, ApplicationInterface};
 
use crate::runtime2::port::{Port, PortState};
 
use crate::runtime2::scheduler::SchedulerCtx;
 
use crate::runtime2::scheduler::{ComponentCtxFancy, SchedulerCtx};
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
@@ -78,24 +78,17 @@ pub(crate) enum ConnectorVariant {
 
}
 

	
 
impl Connector for ConnectorVariant {
 
    fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
    fn run(&mut self, scheduler_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state),
 
            ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state),
 
        }
 
    }
 

	
 
    fn run(&mut self, scheduler_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.run(scheduler_ctx, conn_ctx, delta_state),
 
            ConnectorVariant::Native(c) => c.run(scheduler_ctx, conn_ctx, delta_state),
 
            ConnectorVariant::UserDefined(c) => c.run(scheduler_ctx, comp_ctx),
 
            ConnectorVariant::Native(c) => c.run(scheduler_ctx, comp_ctx),
 
        }
 
    }
 
}
 

	
 
pub(crate) struct ScheduledConnector {
 
    pub connector: ConnectorVariant, // access by connector
 
    pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector
 
    pub ctx_fancy: ComponentCtxFancy,
 
    pub public: ConnectorPublic, // accessible by all schedulers and connectors
 
    pub router: ControlMessageHandler,
 
    pub shutting_down: bool,
 
@@ -263,7 +256,8 @@ impl RuntimeInner {
 

	
 
    // --- Creating/retrieving/destroying components
 

	
 
    pub(crate) fn create_interface_component(&self, component: ConnectorApplication) -> ConnectorKey {
 
    /// Creates an initially sleeping application connector.
 
    fn create_interface_component(&self, component: ConnectorApplication) -> ConnectorKey {
 
        // Initialize as sleeping, as it will be scheduled by the programmer.
 
        let mut lock = self.connectors.write().unwrap();
 
        let key = lock.create(ConnectorVariant::Native(Box::new(component)), true);
 
@@ -272,35 +266,17 @@ impl RuntimeInner {
 
        return key;
 
    }
 

	
 
    /// Creates a new PDL component. The caller MUST make sure to schedule the
 
    /// connector.
 
    // TODO: Nicer code, not forcing the caller to schedule, perhaps?
 
    pub(crate) fn create_pdl_component(&self, created_by: &mut ScheduledConnector, connector: ConnectorPDL) -> ConnectorKey {
 
    /// Creates a new PDL component. This function just creates the component.
 
    /// If you create it initially awake, then you must add it to the work
 
    /// queue. Other aspects of correctness (i.e. setting initial ports) are
 
    /// relinquished to the caller!
 
    pub(crate) fn create_pdl_component(&self, connector: ConnectorPDL, initially_sleeping: bool) -> ConnectorKey {
 
        // Create as not sleeping, as we'll schedule it immediately
 
        let key = {
 
            let mut lock = self.connectors.write().unwrap();
 
            lock.create(ConnectorVariant::UserDefined(connector), false)
 
            lock.create(ConnectorVariant::UserDefined(connector), initially_sleeping)
 
        };
 

	
 
        // Transfer the ports
 
        {
 
            let lock = self.connectors.read().unwrap();
 
            let created = lock.get_private(&key);
 

	
 
            match &created.connector {
 
                ConnectorVariant::UserDefined(connector) => {
 

	
 
                    println!("DEBUG: The connector {} owns the ports: {:?}", key.index, connector.ports.owned_ports.iter().map(|v| v.index).collect::<Vec<_>>());
 
                    for port_id in connector.ports.owned_ports.iter().copied() {
 
                        println!("DEBUG: Transferring port {:?} from {} to {}", port_id, created_by.context.id.0, key.index);
 
                        let port = created_by.context.remove_port(port_id);
 
                        created.context.add_port(port);
 
                    }
 
                },
 
                ConnectorVariant::Native(_) => unreachable!(),
 
            }
 
        }
 

	
 
        self.increment_active_components();
 
        return key;
 
    }
 
@@ -428,7 +404,7 @@ impl ConnectorStore {
 
    fn create(&mut self, connector: ConnectorVariant, initially_sleeping: bool) -> ConnectorKey {
 
        let mut connector = ScheduledConnector {
 
            connector,
 
            context: ConnectorCtx::new(),
 
            ctx_fancy: ComponentCtxFancy::new_empty(),
 
            public: ConnectorPublic::new(initially_sleeping),
 
            router: ControlMessageHandler::new(),
 
            shutting_down: false,
 
@@ -441,7 +417,7 @@ impl ConnectorStore {
 
            // No free entries, allocate new entry
 
            index = self.connectors.len();
 
            key = ConnectorKey{ index: index as u32 };
 
            connector.context.id = key.downcast();
 
            connector.ctx_fancy.id = key.downcast();
 

	
 
            let connector = Box::into_raw(Box::new(connector));
 
            self.connectors.push(connector);
 
@@ -449,7 +425,7 @@ impl ConnectorStore {
 
            // Free spot available
 
            index = self.free.pop().unwrap();
 
            key = ConnectorKey{ index: index as u32 };
 
            connector.context.id = key.downcast();
 
            connector.ctx_fancy.id = key.downcast();
 

	
 
            unsafe {
 
                let target = self.connectors.get_mut(index);
src/runtime2/native.rs
Show inline comments
 
@@ -4,25 +4,21 @@ use std::sync::atomic::Ordering;
 

	
 
use crate::protocol::ComponentCreationError;
 
use crate::protocol::eval::ValueGroup;
 
use crate::ProtocolDescription;
 
use crate::runtime2::scheduler::ComponentCtxFancy;
 

	
 
use super::{ConnectorKey, ConnectorId, RuntimeInner, ConnectorCtx};
 
use super::scheduler::SchedulerCtx;
 
use super::{ConnectorKey, ConnectorId, RuntimeInner};
 
use super::scheduler::{SchedulerCtx, ComponentCtxFancy, ReceivedMessage};
 
use super::port::{Port, PortIdLocal, Channel, PortKind};
 
use super::connector::{Branch, ConnectorScheduling, RunDeltaState, ConnectorPDL};
 
use super::connector::{Branch, ConnectorScheduling, ConnectorPDL};
 
use super::connector::find_ports_in_value_group;
 
use super::inbox::{Message, MessageContents};
 

	
 
/// Generic connector interface from the scheduler's point of view.
 
pub(crate) trait Connector {
 
    /// Handle a new message (preprocessed by the scheduler). You probably only
 
    /// want to handle `Data`, `Sync`, and `Solution` messages. The others are
 
    /// intended for the scheduler itself.
 
    fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState);
 

	
 
    /// Should run the connector's behaviour up until the next blocking point.
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
    /// One should generally request and handle new messages from the component
 
    /// context. Then perform any logic the component has to do, and in the
 
    /// process perhaps queue up some state changes using the same context.
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling;
 
}
 

	
 
type SyncDone = Arc<(Mutex<bool>, Condvar)>;
 
@@ -46,7 +42,10 @@ impl ConnectorApplication {
 
        let sync_done = Arc::new(( Mutex::new(false), Condvar::new() ));
 
        let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32)));
 

	
 
        let connector = ConnectorApplication { sync_done: sync_done.clone(), job_queue: job_queue.clone() };
 
        let connector = ConnectorApplication {
 
            sync_done: sync_done.clone(),
 
            job_queue: job_queue.clone()
 
        };
 
        let interface = ApplicationInterface::new(sync_done, job_queue, runtime);
 

	
 
        return (connector, interface);
 
@@ -54,36 +53,35 @@ impl ConnectorApplication {
 
}
 

	
 
impl Connector for ConnectorApplication {
 
    fn handle_message(&mut self, message: Message, _ctx: &ConnectorCtx, _delta_state: &mut RunDeltaState) {
 
        use MessageContents as MC;
 

	
 
        match message.contents {
 
            MC::Data(_) => unreachable!("data message in API connector"),
 
            MC::Sync(_) | MC::RequestCommit(_) | MC::ConfirmCommit(_) => {
 
                // Handling sync in API
 
            },
 
            MC::Control(_) => {},
 
            MC::Ping => {},
 
    fn run(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
 
        // Handle any incoming messages if we're participating in a round
 
        while let Some(message) = comp_ctx.read_next_message() {
 
            match message {
 
                ReceivedMessage::Data(_) => todo!("data message in API connector"),
 
                ReceivedMessage::Sync(_) | ReceivedMessage::RequestCommit(_) | ReceivedMessage::ConfirmCommit(_) => {
 
                    todo!("sync message in API connector");
 
                }
 
            }
 
        }
 
    }
 

	
 
    fn run(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, _conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop_front() {
 
            match job {
 
                ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
 
                    println!("DEBUG: API adopting ports");
 
                    delta_state.new_ports.reserve(2);
 
                    delta_state.new_ports.push(endpoint_a);
 
                    delta_state.new_ports.push(endpoint_b);
 
                }
 
                ApplicationJob::NewConnector(connector) => {
 
                    println!("DEBUG: API creating connector");
 
                    delta_state.new_connectors.push(connector);
 
                },
 
                ApplicationJob::Shutdown => {
 
                    debug_assert!(queue.is_empty());
 
                    return ConnectorScheduling::Exit;
 
        // Handle requests coming from the API
 
        {
 
            let mut queue = self.job_queue.lock().unwrap();
 
            while let Some(job) = queue.pop_front() {
 
                match job {
 
                    ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
 
                        println!("DEBUG: API adopting ports");
 
                        comp_ctx.push_port(endpoint_a);
 
                        comp_ctx.push_port(endpoint_b);
 
                    }
 
                    ApplicationJob::NewConnector(connector) => {
 
                        println!("DEBUG: API creating connector");
 
                        comp_ctx.push_component(connector);
 
                    },
 
                    ApplicationJob::Shutdown => {
 
                        debug_assert!(queue.is_empty());
 
                        return ConnectorScheduling::Exit;
 
                    }
 
                }
 
            }
 
        }
src/runtime2/port.rs
Show inline comments
 
@@ -21,13 +21,13 @@ impl PortIdLocal {
 
    }
 
}
 

	
 
#[derive(Debug, Eq, PartialEq)]
 
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
#[derive(Debug, Eq, PartialEq)]
 
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
 
pub enum PortState {
 
    Open,
 
    Closed,
src/runtime2/scheduler.rs
Show inline comments
 
use std::collections::VecDeque;
 
use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 
use crate::runtime2::connector::{BranchId, ConnectorPDL};
 
use crate::runtime2::inbox::{DataMessage, PrivateInbox};
 

	
 
use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey};
 
use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey, ConnectorVariant};
 
use super::port::{Port, PortState, PortIdLocal};
 
use super::native::Connector;
 
use super::connector::{ConnectorScheduling, RunDeltaState};
 
use super::inbox::{Message, MessageContents, ControlMessageVariant, ControlMessage};
 

	
 
/// Contains fields that are mostly managed by the scheduler, but may be
 
/// accessed by the connector
 
pub(crate) struct ConnectorCtx {
 
    pub(crate) id: ConnectorId,
 
    pub(crate) ports: Vec<Port>,
 
}
 

	
 
impl ConnectorCtx {
 
    pub(crate) fn new() -> ConnectorCtx {
 
        Self{
 
            id: ConnectorId::new_invalid(),
 
            ports: Vec::new(),
 
        }
 
    }
 

	
 
    pub(crate) fn add_port(&mut self, port: Port) {
 
        debug_assert!(!self.ports.iter().any(|v| v.self_id == port.self_id));
 
        self.ports.push(port);
 
    }
 

	
 
    pub(crate) fn remove_port(&mut self, id: PortIdLocal) -> Port {
 
        let index = self.port_id_to_index(id);
 
        return self.ports.remove(index);
 
    }
 

	
 
    pub(crate) fn get_port(&self, id: PortIdLocal) -> &Port {
 
        let index = self.port_id_to_index(id);
 
        return &self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_mut(&mut self, id: PortIdLocal) -> &mut Port {
 
        let index = self.port_id_to_index(id);
 
        return &mut self.ports[index];
 
    }
 

	
 
    fn port_id_to_index(&self, id: PortIdLocal) -> usize {
 
        for (idx, port) in self.ports.iter().enumerate() {
 
            if port.self_id == id {
 
                return idx;
 
            }
 
        }
 

	
 
        panic!("port {:?}, not owned by connector", id);
 
    }
 
}
 
use super::connector::{BranchId, ConnectorPDL, ConnectorScheduling};
 
use super::inbox::{
 
    Message, MessageContents, ControlMessageVariant,
 
    DataMessage, ControlMessage, SolutionMessage, SyncMessage
 
};
 

	
 
// Because it contains pointers we're going to do a copy by value on this one
 
#[derive(Clone, Copy)]
 
@@ -74,8 +30,6 @@ impl Scheduler {
 
    pub fn run(&mut self) {
 
        // Setup global storage and workspaces that are reused for every
 
        // connector that we run
 
        let mut delta_state = RunDeltaState::new();
 

	
 
        'thread_loop: loop {
 
            // Retrieve a unit of work
 
            self.debug("Waiting for work");
 
@@ -97,71 +51,7 @@ impl Scheduler {
 
            // connector.
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 
            while cur_schedule == ConnectorScheduling::Immediate {
 
                // Check all the message that are in the shared inbox
 
                while let Some(message) = scheduled.public.inbox.take_message() {
 
                    // Check for rerouting
 
                    self.debug_conn(connector_id, &format!("Handling message from conn({}) at port({})\n --- {:?}", message.sending_connector.0, message.receiving_port.index, message));
 
                    if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) {
 
                        self.debug_conn(connector_id, &format!(" ... Rerouting to connector {}", other_connector_id.0));
 
                        self.runtime.send_message(other_connector_id, message);
 
                        continue;
 
                    }
 

	
 
                    self.debug_conn(connector_id, " ... Handling message myself");
 
                    // Check for messages that requires special action from the
 
                    // scheduler.
 
                    if let MessageContents::Control(content) = message.contents {
 
                        match content.content {
 
                            ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
 
                                // Need to change port target
 
                                let port = scheduled.context.get_port_mut(port_id);
 
                                port.peer_connector = new_target_connector_id;
 

	
 
                                // Note: for simplicity we program the scheduler to always finish
 
                                // running a connector with an empty outbox. If this ever changes
 
                                // then accepting the "port peer changed" message implies we need
 
                                // to change the recipient of the message in the outbox.
 
                                debug_assert!(delta_state.outbox.is_empty());
 

	
 
                                // And respond with an Ack
 
                                let ack_message = Message{
 
                                    sending_connector: connector_id,
 
                                    receiving_port: PortIdLocal::new_invalid(),
 
                                    contents: MessageContents::Control(ControlMessage{
 
                                        id: content.id,
 
                                        content: ControlMessageVariant::Ack,
 
                                    }),
 
                                };
 
                                self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message));
 
                                self.runtime.send_message(message.sending_connector, ack_message);
 
                            },
 
                            ControlMessageVariant::CloseChannel(port_id) => {
 
                                // Mark the port as being closed
 
                                let port = scheduled.context.get_port_mut(port_id);
 
                                port.state = PortState::Closed;
 

	
 
                                // Send an Ack
 
                                let ack_message = Message{
 
                                    sending_connector: connector_id,
 
                                    receiving_port: PortIdLocal::new_invalid(),
 
                                    contents: MessageContents::Control(ControlMessage{
 
                                        id: content.id,
 
                                        content: ControlMessageVariant::Ack,
 
                                    }),
 
                                };
 
                                self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message));
 
                                self.runtime.send_message(message.sending_connector, ack_message);
 

	
 
                            },
 
                            ControlMessageVariant::Ack => {
 
                                scheduled.router.handle_ack(content.id);
 
                            }
 
                        }
 
                    } else {
 
                        // Let connector handle message
 
                        scheduled.connector.handle_message(message, &scheduled.context, &mut delta_state);
 
                    }
 
                }
 
                self.handle_inbox_messages(scheduled);
 

	
 
                // Run the main behaviour of the connector, depending on its
 
                // current state.
 
@@ -180,14 +70,12 @@ impl Scheduler {
 
                } else {
 
                    self.debug_conn(connector_id, "Running ...");
 
                    let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
 
                    let new_schedule = scheduled.connector.run(
 
                        scheduler_ctx, &scheduled.context, &mut delta_state
 
                    );
 
                    let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx_fancy);
 
                    self.debug_conn(connector_id, "Finished running");
 

	
 
                    // Handle all of the output from the current run: messages to
 
                    // send and connectors to instantiate.
 
                    self.handle_delta_state(scheduled, connector_key.downcast(), &mut delta_state);
 
                    self.handle_changes_in_context(scheduled);
 

	
 
                    cur_schedule = new_schedule;
 
                }
 
@@ -212,7 +100,7 @@ impl Scheduler {
 
                    // Prepare for exit. Set the shutdown flag and broadcast
 
                    // messages to notify peers of closing channels
 
                    scheduled.shutting_down = true;
 
                    for port in &scheduled.context.ports {
 
                    for port in &scheduled.ctx_fancy.ports {
 
                        if port.state != PortState::Closed {
 
                            let message = scheduled.router.prepare_closing_channel(
 
                                port.self_id, port.peer_id,
 
@@ -234,119 +122,244 @@ impl Scheduler {
 
        }
 
    }
 

	
 
    fn handle_delta_state(&mut self,
 
        cur_connector: &mut ScheduledConnector, connector_id: ConnectorId,
 
        delta_state: &mut RunDeltaState
 
    ) {
 
        // Handling any messages that were sent
 
        if !delta_state.outbox.is_empty() {
 
            for mut message in delta_state.outbox.drain(..) {
 
                // Based on the message contents, decide where the message
 
                // should be sent to. This might end up modifying the message.
 
                self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message));
 
                let (peer_connector, self_port, peer_port) = match &mut message {
 
                    MessageContents::Data(contents) => {
 
                        let port = cur_connector.context.get_port(contents.sending_port);
 
                        (port.peer_connector, contents.sending_port, port.peer_id)
 
                    },
 
                    MessageContents::Sync(contents) => {
 
                        let connector = contents.to_visit.pop().unwrap();
 
                        (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid())
 
                    },
 
                    MessageContents::RequestCommit(contents)=> {
 
                        let connector = contents.to_visit.pop().unwrap();
 
                        (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid())
 
                    },
 
                    MessageContents::ConfirmCommit(contents) => {
 
                        for to_visit in &contents.to_visit {
 
                            let message = Message{
 
    /// Receiving messages from the public inbox and handling them or storing
 
    /// them in the component's private inbox
 
    fn handle_inbox_messages(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx_fancy.id;
 

	
 
        while let Some(message) = scheduled.public.inbox.take_message() {
 
            // Check for rerouting
 
            self.debug_conn(connector_id, &format!("Handling message from conn({}) at port({})\n --- {:?}", message.sending_connector.0, message.receiving_port.index, message));
 
            if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) {
 
                self.debug_conn(connector_id, &format!(" ... Rerouting to connector {}", other_connector_id.0));
 
                self.runtime.send_message(other_connector_id, message);
 
                continue;
 
            }
 

	
 
            // Handle special messages here, messages for the component
 
            // will be added to the inbox.
 
            self.debug_conn(connector_id, " ... Handling message myself");
 
            match message.contents {
 
                MessageContents::Control(content) => {
 
                    match content.content {
 
                        ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
 
                            // Need to change port target
 
                            let port = scheduled.ctx_fancy.get_port_mut_by_id(port_id).unwrap();
 
                            port.peer_connector = new_target_connector_id;
 

	
 
                            // Note: for simplicity we program the scheduler to always finish
 
                            // running a connector with an empty outbox. If this ever changes
 
                            // then accepting the "port peer changed" message implies we need
 
                            // to change the recipient of the message in the outbox.
 
                            debug_assert!(scheduled.ctx_fancy.outbox.is_empty());
 

	
 
                            // And respond with an Ack
 
                            let ack_message = Message{
 
                                sending_connector: connector_id,
 
                                receiving_port: PortIdLocal::new_invalid(),
 
                                contents: MessageContents::ConfirmCommit(contents.clone()),
 
                                contents: MessageContents::Control(ControlMessage{
 
                                    id: content.id,
 
                                    content: ControlMessageVariant::Ack,
 
                                }),
 
                            };
 
                            self.runtime.send_message(*to_visit, message);
 
                            self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message));
 
                            self.runtime.send_message(message.sending_connector, ack_message);
 
                        },
 
                        ControlMessageVariant::CloseChannel(port_id) => {
 
                            // Mark the port as being closed
 
                            let port = scheduled.ctx_fancy.get_port_mut_by_id(port_id).unwrap();
 
                            port.state = PortState::Closed;
 

	
 
                            // Send an Ack
 
                            let ack_message = Message{
 
                                sending_connector: connector_id,
 
                                receiving_port: PortIdLocal::new_invalid(),
 
                                contents: MessageContents::Control(ControlMessage{
 
                                    id: content.id,
 
                                    content: ControlMessageVariant::Ack,
 
                                }),
 
                            };
 
                            self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message));
 
                            self.runtime.send_message(message.sending_connector, ack_message);
 
                        },
 
                        ControlMessageVariant::Ack => {
 
                            scheduled.router.handle_ack(content.id);
 
                        }
 
                        (ConnectorId::new_invalid(), PortIdLocal::new_invalid(), PortIdLocal::new_invalid())
 
                    },
 
                    MessageContents::Control(_) | MessageContents::Ping => {
 
                        // Never generated by the user's code
 
                        unreachable!();
 
                    }
 
                };
 
                },
 
                MessageContents::Ping => {
 
                    // Pings are sent just to wake up a component, so
 
                    // nothing to do here.
 
                },
 
                _ => {
 
                    // All other cases have to be handled by the component
 
                    scheduled.ctx_fancy.inbox_messages.push(message);
 
                }
 
            }
 
        }
 
    }
 

	
 
                // TODO: Maybe clean this up, perhaps special case for
 
                //  ConfirmCommit can be handled differently.
 
                if peer_connector.is_valid() {
 
                    if peer_port.is_valid() {
 
                        // Sending a message to a port, so the port may not be
 
                        // closed.
 
                        let port = cur_connector.context.get_port(self_port);
 
                        match port.state {
 
                            PortState::Open => {},
 
                            PortState::Closed => {
 
                                todo!("Handling sending over a closed port");
 
                            }
 
    /// Handles changes to the context that were made by the component. This is
 
    /// the way (due to Rust's borrowing rules) that we bubble up changes in the
 
    /// component's state that the scheduler needs to know about (e.g. a message
 
    /// that the component wants to send).
 
    fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx_fancy.id;
 

	
 
        // Handling any messages that were sent
 
        while let Some(mut message) = scheduled.ctx_fancy.outbox.pop_front() {
 
            // Based on the message contents, decide where the message
 
            // should be sent to. This might end up modifying the message.
 
            self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message));
 
            let (peer_connector, self_port, peer_port) = match &mut message {
 
                MessageContents::Data(contents) => {
 
                    let port = scheduled.ctx_fancy.get_port_by_id(contents.sending_port).unwrap();
 
                    (port.peer_connector, contents.sending_port, port.peer_id)
 
                },
 
                MessageContents::Sync(contents) => {
 
                    let connector = contents.to_visit.pop().unwrap();
 
                    (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid())
 
                },
 
                MessageContents::RequestCommit(contents)=> {
 
                    let connector = contents.to_visit.pop().unwrap();
 
                    (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid())
 
                },
 
                MessageContents::ConfirmCommit(contents) => {
 
                    for to_visit in &contents.to_visit {
 
                        let message = Message{
 
                            sending_connector: scheduled.ctx_fancy.id,
 
                            receiving_port: PortIdLocal::new_invalid(),
 
                            contents: MessageContents::ConfirmCommit(contents.clone()),
 
                        };
 
                        self.runtime.send_message(*to_visit, message);
 
                    }
 
                    (ConnectorId::new_invalid(), PortIdLocal::new_invalid(), PortIdLocal::new_invalid())
 
                },
 
                MessageContents::Control(_) | MessageContents::Ping => {
 
                    // Never generated by the user's code
 
                    unreachable!();
 
                }
 
            };
 

	
 
            // TODO: Maybe clean this up, perhaps special case for
 
            //  ConfirmCommit can be handled differently.
 
            if peer_connector.is_valid() {
 
                if peer_port.is_valid() {
 
                    // Sending a message to a port, so the port may not be
 
                    // closed.
 
                    let port = scheduled.ctx_fancy.get_port_by_id(self_port).unwrap();
 
                    match port.state {
 
                        PortState::Open => {},
 
                        PortState::Closed => {
 
                            todo!("Handling sending over a closed port");
 
                        }
 
                    }
 
                    let message = Message {
 
                        sending_connector: connector_id,
 
                        receiving_port: peer_port,
 
                        contents: message,
 
                    };
 
                    self.runtime.send_message(peer_connector, message);
 
                }
 
                let message = Message {
 
                    sending_connector: scheduled.ctx_fancy.id,
 
                    receiving_port: peer_port,
 
                    contents: message,
 
                };
 
                self.runtime.send_message(peer_connector, message);
 
            }
 
        }
 

	
 
        if !delta_state.new_ports.is_empty() {
 
            for port in delta_state.new_ports.drain(..) {
 
                cur_connector.context.ports.push(port);
 
            }
 
        }
 
        while let Some(state_change) = scheduled.ctx_fancy.state_changes.pop_front() {
 
            match state_change {
 
                ComponentStateChange::CreatedComponent(component) => {
 
                    // Add the new connector to the global registry
 
                    let new_key = self.runtime.create_pdl_component(component, false);
 
                    let new_connector = self.runtime.get_component_private(&new_key);
 

	
 
                    // Transfer ports
 
                    // TODO: Clean this up the moment native components are somewhat
 
                    //  properly implemented. We need to know about the ports that
 
                    //  are "owned by the PDL code", and then make sure that the
 
                    //  context contains a description of those ports.
 
                    let ports = if let ConnectorVariant::UserDefined(connector) = &new_connector.connector {
 
                        &connector.ports.owned_ports
 
                    } else {
 
                        unreachable!();
 
                    };
 

	
 
        // Handling any new connectors that were scheduled
 
        // TODO: Pool outgoing messages to reduce atomic access
 
        if !delta_state.new_connectors.is_empty() {
 
            for new_connector in delta_state.new_connectors.drain(..) {
 
                // Add to global registry to obtain key
 
                let new_key = self.runtime.create_pdl_component(cur_connector, new_connector);
 
                let new_connector = self.runtime.get_component_private(&new_key);
 

	
 
                // Call above changed ownership of ports, but we still have to
 
                // let the other end of the channel know that the port has
 
                // changed location.
 
                for port in &new_connector.context.ports {
 
                    let reroute_message = cur_connector.router.prepare_reroute(
 
                        port.self_id, port.peer_id, cur_connector.context.id,
 
                        port.peer_connector, new_connector.context.id
 
                    );
 

	
 
                    self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message));
 
                    self.runtime.send_message(port.peer_connector, reroute_message);
 
                }
 
                    for port_id in ports {
 
                        // Transfer messages associated with the transferred port
 
                        let mut message_idx = 0;
 
                        while message_idx < scheduled.ctx_fancy.inbox_messages.len() {
 
                            let message = &scheduled.ctx_fancy.inbox_messages[message_idx];
 
                            if message.receiving_port == *port_id {
 
                                // Need to transfer this message
 
                                let taken_message = scheduled.ctx_fancy.inbox_messages.remove(message_idx);
 
                                new_connector.ctx_fancy.inbox_messages.push(taken_message);
 
                            } else {
 
                                message_idx += 1;
 
                            }
 
                        }
 

	
 
                // Schedule new connector to run
 
                self.runtime.push_work(new_key);
 
                        // Transfer the port itself
 
                        let port_index = scheduled.ctx_fancy.ports.iter()
 
                            .position(|v| v.self_id == *port_id)
 
                            .unwrap();
 
                        let port = scheduled.ctx_fancy.ports.remove(port_index);
 
                        new_connector.ctx_fancy.ports.push(port.clone());
 

	
 
                        // Notify the peer that the port has changed
 
                        let reroute_message = scheduled.router.prepare_reroute(
 
                            port.self_id, port.peer_id, scheduled.ctx_fancy.id,
 
                            port.peer_connector, new_connector.ctx_fancy.id
 
                        );
 

	
 
                        self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message));
 
                        self.runtime.send_message(port.peer_connector, reroute_message);
 
                    }
 

	
 
                    // Schedule new connector to run
 
                    self.runtime.push_work(new_key);
 
                },
 
                ComponentStateChange::CreatedPort(port) => {
 
                    scheduled.ctx_fancy.ports.push(port);
 
                },
 
                ComponentStateChange::ChangedPort(port_change) => {
 
                    if port_change.is_acquired {
 
                        scheduled.ctx_fancy.ports.push(port_change.port);
 
                    } else {
 
                        let index = scheduled.ctx_fancy.ports
 
                            .iter()
 
                            .position(|v| v.self_id == port_change.port.self_id)
 
                            .unwrap();
 
                        scheduled.ctx_fancy.ports.remove(index);
 
                    }
 
                }
 
            }
 
        }
 

	
 
        debug_assert!(delta_state.outbox.is_empty());
 
        debug_assert!(delta_state.new_ports.is_empty());
 
        debug_assert!(delta_state.new_connectors.is_empty());
 
        // Finally, check if we just entered or just left a sync region
 
        if scheduled.ctx_fancy.changed_in_sync {
 
            if scheduled.ctx_fancy.is_in_sync {
 
                // Just entered sync region
 
            } else {
 
                // Just left sync region. So clear inbox
 
                scheduled.ctx_fancy.inbox_messages.clear();
 
                scheduled.ctx_fancy.inbox_len_read = 0;
 
            }
 

	
 
            scheduled.ctx_fancy.changed_in_sync = false; // reset flag
 
        }
 
    }
 

	
 
    fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) {
 
        debug_assert_eq!(connector_key.index, connector.context.id.0);
 
        debug_assert_eq!(connector_key.index, connector.ctx_fancy.id.0);
 
        debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false);
 

	
 
        // This is the running connector, and only the running connector may
 
        // decide it wants to sleep again.
 
        connector.public.sleeping.store(true, Ordering::Release);
 

	
 
        // But do to reordering we might have received messages from peers who
 
        // But due to reordering we might have received messages from peers who
 
        // did not consider us sleeping. If so, then we wake ourselves again.
 
        if !connector.public.inbox.is_empty() {
 
            // Try to wake ourselves up
 
            // Try to wake ourselves up (needed because someone might be trying
 
            // the exact same atomic compare-and-swap at this point in time)
 
            let should_wake_up_again = connector.public.sleeping
 
                .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
                .is_ok();
 
@@ -378,14 +391,9 @@ enum ComponentStateChange {
 
}
 

	
 
#[derive(Clone)]
 
pub(crate) enum ComponentPortChange {
 
    Acquired(Port),
 
    Released(Port),
 
}
 

	
 
struct InboxMessage {
 
    target_port: PortIdLocal,
 
    data: DataMessage,
 
pub(crate) struct ComponentPortChange {
 
    pub is_acquired: bool, // otherwise: released
 
    pub port: Port,
 
}
 

	
 
/// The component context (better name may be invented). This was created
 
@@ -395,23 +403,43 @@ struct InboxMessage {
 
/// scheduler need to be exchanged.
 
pub(crate) struct ComponentCtxFancy {
 
    // Mostly managed by the scheduler
 
    id: ConnectorId,
 
    pub(crate) id: ConnectorId,
 
    ports: Vec<Port>,
 
    inbox_messages: Vec<InboxMessage>,
 
    inbox_messages: Vec<Message>, // never control or ping messages
 
    inbox_len_read: usize,
 
    // Submitted by the component
 
    is_in_sync: bool,
 
    changed_in_sync: bool,
 
    outbox: Vec<MessageContents>,
 
    state_changes: Vec<ComponentStateChange>
 
    outbox: VecDeque<MessageContents>,
 
    state_changes: VecDeque<ComponentStateChange>
 
}
 

	
 
pub(crate) enum ReceivedMessage {
 
    Data((PortIdLocal, DataMessage)),
 
    Sync(SyncMessage),
 
    RequestCommit(SolutionMessage),
 
    ConfirmCommit(SolutionMessage),
 
}
 

	
 
impl ComponentCtxFancy {
 
    pub(crate) fn new_empty() -> Self {
 
        return Self{
 
            id: ConnectorId::new_invalid(),
 
            ports: Vec::new(),
 
            inbox_messages: Vec::new(),
 
            inbox_len_read: 0,
 
            is_in_sync: false,
 
            changed_in_sync: false,
 
            outbox: VecDeque::new(),
 
            state_changes: VecDeque::new(),
 
        };
 
    }
 

	
 
    /// Notify the runtime that the component has created a new component. May
 
    /// only be called outside of a sync block.
 
    pub(crate) fn push_component(&mut self, component: ConnectorPDL) {
 
        debug_assert!(!self.is_in_sync);
 
        self.state_changes.push(ComponentStateChange::CreatedComponent(component));
 
        self.state_changes.push_back(ComponentStateChange::CreatedComponent(component));
 
    }
 

	
 
    /// Notify the runtime that the component has created a new port. May only
 
@@ -419,10 +447,21 @@ impl ComponentCtxFancy {
 
    /// block, pass them when calling `notify_sync_end`).
 
    pub(crate) fn push_port(&mut self, port: Port) {
 
        debug_assert!(!self.is_in_sync);
 
        self.state_changes.push(ComponentStateChange::CreatedPort(port))
 
        self.state_changes.push_back(ComponentStateChange::CreatedPort(port))
 
    }
 

	
 
    pub(crate) fn get_port_by_id(&self, id: PortIdLocal) -> Option<&Port> {
 
        return self.ports.iter().find(|v| v.self_id == id);
 
    }
 

	
 
    /// Notify that component will enter a sync block.
 
    fn get_port_mut_by_id(&mut self, id: PortIdLocal) -> Option<&mut Port> {
 
        return self.ports.iter_mut().find(|v| v.self_id == id);
 
    }
 

	
 
    /// Notify that component will enter a sync block. Note that after calling
 
    /// this function you must allow the scheduler to pick up the changes in
 
    /// the context by exiting your `Component::run` function with an
 
    /// appropriate scheduling value.
 
    pub(crate) fn notify_sync_start(&mut self) -> &[Port] {
 
        debug_assert!(!self.is_in_sync);
 

	
 
@@ -431,14 +470,20 @@ impl ComponentCtxFancy {
 
        return &self.ports
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_in_sync(&self) -> bool {
 
        return self.is_in_sync;
 
    }
 

	
 
    /// Submit a message for the scheduler to send to the appropriate receiver.
 
    /// May only be called inside of a sync block.
 
    pub(crate) fn submit_message(&mut self, contents: MessageContents) {
 
        debug_assert!(self.is_in_sync);
 
        self.outbox.push(contents);
 
        self.outbox.push_back(contents);
 
    }
 

	
 
    /// Notify that component just finished a sync block.
 
    /// Notify that component just finished a sync block. Like
 
    /// `notify_sync_start`: drop out of the `Component::Run` function.
 
    pub(crate) fn notify_sync_end(&mut self, changed_ports: &[ComponentPortChange]) {
 
        debug_assert!(self.is_in_sync);
 

	
 
@@ -447,26 +492,15 @@ impl ComponentCtxFancy {
 

	
 
        self.state_changes.reserve(changed_ports.len());
 
        for changed_port in changed_ports {
 
            self.state_changes.push(ComponentStateChange::ChangedPort(changed_port.clone()));
 
            self.state_changes.push_back(ComponentStateChange::ChangedPort(changed_port.clone()));
 
        }
 
    }
 

	
 
    /// Inserts message into inbox. Generally only called by scheduler.
 
    pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, data: DataMessage) {
 
        debug_assert!(!self.inbox_messages.iter().any(|v| {
 
            v.target_port == target_port &&
 
                v.data.sender_prev_branch_id == data.sender_prev_branch_id &&
 
                v.data.sender_cur_branch_id == data.sender_cur_branch_id
 
        }));
 

	
 
        self.inbox_messages.push(InboxMessage{ target_port, data })
 
    }
 

	
 
    /// Retrieves messages matching a particular port and branch id. But only
 
    /// those messages that have been previously received with
 
    /// `read_next_message`.
 
    pub(crate) fn get_read_messages(&self, match_port_id: PortIdLocal, match_prev_branch_id: BranchId) -> MessagesIter {
 
        return MessageIter {
 
    pub(crate) fn get_read_data_messages(&self, match_port_id: PortIdLocal, match_prev_branch_id: BranchId) -> MessagesIter {
 
        return MessagesIter {
 
            messages: &self.inbox_messages,
 
            next_index: 0,
 
            max_index: self.inbox_len_read,
 
@@ -476,47 +510,62 @@ impl ComponentCtxFancy {
 

	
 
    /// Retrieves the next unread message from the inbox `None` if there are no
 
    /// (new) messages to read.
 
    pub(crate) fn read_next_message(&mut self) -> Option<(&PortIdLocal, &DataMessage)> {
 
        if self.inbox_len_read == self.inbox_messages.len() {
 
            return None;
 
        }
 
    // TODO: Fix the clone of the data message, entirely unnecessary
 
    pub(crate) fn read_next_message(&mut self) -> Option<ReceivedMessage> {
 
        if !self.is_in_sync { return None; }
 
        if self.inbox_len_read == self.inbox_messages.len() { return None; }
 

	
 
        let message = &self.inbox_messages[self.inbox_len_read];
 
        self.inbox_len_read += 1;
 
        return Some((&message.target_port, &message.data))
 
        if let MessageContents::Data(contents) = &message.contents {
 
            self.inbox_len_read += 1;
 
            return Some(ReceivedMessage::Data((message.receiving_port, contents.clone())));
 
        } else {
 
            // Must be a sync/solution message
 
            let message = self.inbox_messages.remove(self.inbox_len_read);
 
            return match message.contents {
 
                MessageContents::Sync(v) => Some(ReceivedMessage::Sync(v)),
 
                MessageContents::RequestCommit(v) => Some(ReceivedMessage::RequestCommit(v)),
 
                MessageContents::ConfirmCommit(v) => Some(ReceivedMessage::ConfirmCommit(v)),
 
                _ => unreachable!(), // because we only put data/synclike messages in the inbox
 
            }
 
        }
 
    }
 
}
 

	
 
pub(crate) struct MessagesIter<'a> {
 
    messages: &'a [InboxMessage],
 
    messages: &'a [Message],
 
    next_index: usize,
 
    max_index: usize,
 
    match_port_id: PortIdLocal,
 
    match_prev_branch_id: BranchId,
 
}
 

	
 
impl Iterator for MessagesIter {
 
    type Item = DataMessage;
 
impl<'a> Iterator for MessagesIter<'a> {
 
    type Item = &'a DataMessage;
 

	
 
    fn next(&mut self) -> Option<&Self::Item> {
 
    fn next(&mut self) -> Option<Self::Item> {
 
        // Loop until match is found or at end of messages
 
        while self.next_index < self.max_index {
 
            let message = &self.messages[self.next_index];
 
            if message.target_port == self.match_port_id && message.data.sender_prev_branch_id == self.match_prev_branch_id {
 
                // Found a match
 
                break;
 
            if let MessageContents::Data(data_message) = &message.contents {
 
                if message.receiving_port == self.match_port_id && data_message.sender_prev_branch_id == self.match_prev_branch_id {
 
                    // Found a match
 
                    self.next_index += 1;
 
                    return Some(data_message);
 
                }
 
            } else {
 
                // Unreachable because:
 
                //  1. We only iterate over messages that were previously retrieved by `read_next_message`.
 
                //  2. Inbox does not contain control/ping messages.
 
                //  3. If `read_next_message` encounters anything else than a data message, it is removed from the inbox.
 
                unreachable!();
 
            }
 

	
 
            self.next_index += 1;
 
        }
 

	
 
        if self.next_index == self.max_index {
 
            return None;
 
        }
 

	
 
        let message = &self.messages[self.next_index];
 
        self.next_index += 1;
 
        return Some(&message.data);
 
        // No more messages
 
        return None;
 
    }
 
}
 

	
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -6,8 +6,8 @@ use crate::common::Id;
 
use crate::protocol::eval::*;
 

	
 
const NUM_THREADS: u32 = 4;     // number of threads in runtime
 
const NUM_INSTANCES: u32 = 10;   // number of test instances constructed
 
const NUM_LOOPS: u32 = 1;       // number of loops within a single test (not used by all tests)
 
const NUM_INSTANCES: u32 = 10;  // number of test instances constructed
 
const NUM_LOOPS: u32 = 10;      // number of loops within a single test (not used by all tests)
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
0 comments (0 inline, 0 general)