Changeset - 15b9bb47abdc
[Not reviewed]
0 4 0
mh - 4 years ago 2021-10-28 11:27:30
contact@maxhenger.nl
WIP on rewriting execution ctx to fix recv bug
4 files changed with 169 insertions and 13 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::sync::atomic::AtomicBool;
 

	
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::scheduler::Scheduler;
 
use crate::runtime2::scheduler::{ComponentCtxFancy, Scheduler};
 

	
 
use super::ConnectorId;
 
use super::native::Connector;
 
use super::scheduler::{SchedulerCtx, ConnectorCtx};
 
use super::inbox::{
 
    PrivateInbox, PublicInbox,
 
    DataMessage, SyncMessage, SolutionMessage, Message, MessageContents,
 
    SyncBranchConstraint, SyncConnectorSolution
 
};
 
use super::port::{Port, PortKind, PortIdLocal};
 

	
 
/// Represents the identifier of a branch (the index within its container). An
 
/// ID of `0` generally means "no branch" (e.g. no parent, or a port did not
 
/// yet receive anything from any branch).
 
// TODO: Remove Debug derive
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub struct BranchId {
 
    pub index: u32,
 
}
 

	
 
impl BranchId {
 
    fn new_invalid() -> Self {
 
        Self{ index: 0 }
 
    }
 
@@ -389,166 +389,166 @@ impl<'a> RunContext for ConnectorRunContext<'a> {
 
            return Some(Value::Bool(mapping.num_times_fired != 0));
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        return self.prepared_channel.take();
 
    }
 
}
 

	
 
impl Connector for ConnectorPDL {
 
    fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        use MessageContents as MC;
 

	
 
        match message.contents {
 
            MC::Data(content) => self.handle_data_message(message.receiving_port, content),
 
            MC::Sync(content) => self.handle_sync_message(content, ctx, delta_state),
 
            MC::RequestCommit(content) => self.handle_request_commit_message(content, ctx, delta_state),
 
            MC::ConfirmCommit(content) => self.handle_confirm_commit_message(content, ctx, delta_state),
 
            MC::Control(_) | MC::Ping => {},
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        if self.in_sync {
 
            // Check for new messages we haven't seen before. If any of the
 
            // pending branches can accept the message, do so.
 
            while let Some((target_port_id, message)) = self.inbox.next_message() {
 
            while let Some((target_port_id, message)) = comp_ctx.read_next_message() {
 
                let mut branch_idx = self.sync_pending_get.first;
 
                while branch_idx != 0 {
 
                    let branch = &self.branches[branch_idx as usize];
 
                    let next_branch_idx = branch.next_branch_in_queue.unwrap_or(0);
 

	
 
                    let target_port_index = self.ports.get_port_index(*target_port_id).unwrap();
 
                    let port_mapping = self.ports.get_port(branch_idx, target_port_index);
 

	
 
                    if branch.sync_state == SpeculativeState::HaltedAtBranchPoint &&
 
                        branch.halted_at_port == *target_port_id &&
 
                        port_mapping.last_registered_branch_id == message.sender_prev_branch_id {
 
                        // Branch may accept this mesage, so create a fork that
 
                        // contains this message in the inbox.
 
                        let new_branch_idx = self.branches.len() as u32;
 
                        let new_branch = Branch::new_sync_branching_from(new_branch_idx, branch);
 

	
 
                        self.ports.prepare_sync_branch(branch_idx, new_branch_idx);
 
                        let mapping = self.ports.get_port_mut(branch_idx, target_port_index);
 
                        mapping.last_registered_branch_id = message.sender_cur_branch_id;
 

	
 
                        let new_branch_id = BranchId::new(new_branch_idx);
 
                        self.branches.push(new_branch);
 
                        Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id)
 
                    }
 

	
 
                    branch_idx = next_branch_idx;
 
                }
 
            }
 

	
 
            let scheduling = self.run_in_speculative_mode(sched_ctx, conn_ctx, delta_state);
 
            let scheduling = self.run_in_speculative_mode(sched_ctx, comp_ctx, conn_ctx, delta_state);
 

	
 
            // When in speculative mode we might have generated new sync
 
            // solutions, we need to turn them into proposed solutions here.
 
            if self.sync_finished_last_handled != self.sync_finished.last {
 
                // Retrieve first element in queue
 
                let mut next_id;
 
                if self.sync_finished_last_handled == 0 {
 
                    next_id = self.sync_finished.first;
 
                } else {
 
                    let last_handled = &self.branches[self.sync_finished_last_handled as usize];
 
                    debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue"
 
                    next_id = last_handled.next_branch_in_queue.unwrap();
 
                }
 

	
 
                loop {
 
                    let branch_id = BranchId::new(next_id);
 
                    let branch = &self.branches[next_id as usize];
 
                    let branch_next = branch.next_branch_in_queue;
 

	
 
                    // Turn local solution into a message and send it along
 
                    // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed
 
                    let solution_message = self.generate_initial_solution_for_branch(branch_id, conn_ctx);
 
                    if let Some(valid_solution) = solution_message {
 
                        self.submit_sync_solution(valid_solution, conn_ctx, delta_state);
 
                    } else {
 
                        // Branch is actually invalid, but we only just figured
 
                        // it out. We need to mark it as invalid to prevent
 
                        // future use
 
                        Self::remove_branch_from_queue(&mut self.branches, &mut self.sync_finished, branch_id);
 
                        if branch_id.index == self.sync_finished_last_handled {
 
                            self.sync_finished_last_handled = self.sync_finished.last;
 
                        }
 

	
 
                        let branch = &mut self.branches[next_id as usize];
 
                        branch.sync_state = SpeculativeState::Inconsistent;
 
                    }
 

	
 
                    match branch_next {
 
                        Some(id) => next_id = id,
 
                        None => break,
 
                    }
 
                }
 

	
 
                self.sync_finished_last_handled = next_id;
 
            }
 

	
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(sched_ctx, conn_ctx, delta_state);
 
            let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx, conn_ctx, delta_state);
 
            return scheduling;
 
        }
 
    }
 
}
 

	
 
impl ConnectorPDL {
 
    /// Constructs a representation of a connector. The assumption is that the
 
    /// initial branch is at the first instruction of the connector's code,
 
    /// hence is in a non-sync state.
 
    pub fn new(initial_branch: Branch, owned_ports: Vec<PortIdLocal>) -> Self {
 
        Self{
 
            in_sync: false,
 
            branches: vec![initial_branch],
 
            sync_active: BranchQueue::new(),
 
            sync_pending_get: BranchQueue::new(),
 
            sync_finished: BranchQueue::new(),
 
            sync_finished_last_handled: 0, // none at all
 
            cur_round: 0,
 
            committed_to: None,
 
            inbox: PrivateInbox::new(),
 
            ports: ConnectorPorts::new(owned_ports),
 
        }
 
    }
 

	
 
    pub fn is_in_sync_mode(&self) -> bool {
 
        return self.in_sync;
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling connector messages
 
    // -------------------------------------------------------------------------
 

	
 
    pub fn handle_data_message(&mut self, target_port: PortIdLocal, message: DataMessage) {
 
        self.inbox.insert_message(target_port, message);
 
        // self.inbox.insert_message(target_port, message);
 
    }
 

	
 
    /// Accepts a synchronous message and combines it with the locally stored
 
    /// solution(s). Then queue new `Sync`/`Solution` messages when appropriate.
 
    pub fn handle_sync_message(&mut self, message: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) {
 
        debug_assert!(!message.to_visit.contains(&ctx.id)); // own ID already removed
 
        debug_assert!(message.constraints.iter().any(|v| v.connector_id == ctx.id)); // we have constraints
 

	
 
        // TODO: Optimize, use some kind of temp workspace vector
 
        let mut execution_path_branch_ids = Vec::new();
 

	
 
        if self.sync_finished_last_handled != 0 {
 
            // We have some solutions to match against
 
            let constraints_index = message.constraints
 
                .iter()
 
                .position(|v| v.connector_id == ctx.id)
 
                .unwrap();
 
            let constraints = &message.constraints[constraints_index].constraints;
 
            debug_assert!(!constraints.is_empty());
 

	
 
            // Note that we only iterate over the solutions we've already
 
            // handled ourselves, not necessarily
 
            let mut branch_index = self.sync_finished.first;
 
            'branch_loop: loop {
 
@@ -743,49 +743,49 @@ impl ConnectorPDL {
 
        self.committed_to = None;
 
        self.inbox.clear();
 
        self.ports.commit_to_sync();
 

	
 
        // Add/remove any of the ports we lost during the sync phase
 
        for port_delta in &solution.ports_delta {
 
            if port_delta.acquired {
 
                self.ports.add_port(port_delta.port_id);
 
            } else {
 
                self.ports.remove_port(port_delta.port_id);
 
            }
 
        }
 
        solution.commit_to_sync();
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Executing connector code
 
    // -------------------------------------------------------------------------
 

	
 
    /// Runs the connector in synchronous mode. Potential changes to the global
 
    /// system's state are added to the `RunDeltaState` object by the connector,
 
    /// where it is the caller's responsibility to immediately take care of
 
    /// those changes. The return value indicates when (and if) the connector
 
    /// needs to be scheduled again.
 
    pub fn run_in_speculative_mode(&mut self, sched_ctx: SchedulerCtx, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_speculative_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(self.in_sync);
 

	
 
        if self.sync_active.is_empty() {
 
            return ConnectorScheduling::NotNow;
 
        }
 

	
 
        let branch = Self::pop_branch_from_queue(&mut self.branches, &mut self.sync_active);
 

	
 
        // Run the branch to the next blocking point
 
        debug_assert!(branch.prepared_channel.is_none());
 
        let mut run_context = ConnectorRunContext {
 
            branch_index: branch.index.index,
 
            ports: &self.ports,
 
            ports_delta: &branch.ports_delta,
 
            scheduler: sched_ctx,
 
            prepared_channel: None,
 
            received: &branch.received,
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
        // Match statement contains `return` statements only if the particular
 
        // run result behind handled requires an immediate re-run of the
 
        // connector.
 
        match run_result {
 
@@ -843,49 +843,49 @@ impl ConnectorPDL {
 
                }
 
                let local_port_index = local_port_index.unwrap();
 
                let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index);
 

	
 
                // Check for port mapping assignment and, if present, if it is
 
                // consistent
 
                let is_valid_get = if port_mapping.is_assigned {
 
                    assert!(port_mapping.num_times_fired <= 1); // temporary, until we get rid of `fires`
 
                    port_mapping.num_times_fired == 1
 
                } else {
 
                    // Not yet assigned
 
                    port_mapping.mark_speculative(1);
 
                    true
 
                };
 

	
 
                if is_valid_get {
 
                    // Mark as a branching point for future messages
 
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                    branch.halted_at_port = local_port_id;
 
                    let branch_id = branch.index;
 
                    Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch_id);
 

	
 
                    // But if some messages can be immediately applied, do so
 
                    // now.
 
                    let messages = self.inbox.get_messages(local_port_id, port_mapping.last_registered_branch_id);
 
                    let messages = comp_ctx.get_read_messages(local_port_id, port_mapping.last_registered_branch_id);
 
                    let mut did_have_messages = false;
 

	
 
                    for message in messages {
 
                        did_have_messages = true;
 

	
 
                        // For each message prepare a new branch to execute
 
                        let parent_branch = &self.branches[branch_id.index as usize];
 
                        let new_branch_index = self.branches.len() as u32;
 
                        let mut new_branch = Branch::new_sync_branching_from(new_branch_index, parent_branch);
 
                        self.ports.prepare_sync_branch(branch_id.index, new_branch_index);
 

	
 
                        let port_mapping = self.ports.get_port_mut(new_branch_index, local_port_index);
 
                        port_mapping.last_registered_branch_id = message.sender_cur_branch_id;
 
                        debug_assert!(port_mapping.is_assigned && port_mapping.num_times_fired == 1);
 

	
 
                        new_branch.received.insert(local_port_id, message.clone());
 

	
 
                        // If the message contains any ports then they will now
 
                        // be owned by the new branch
 
                        debug_assert!(results.ports.is_empty());
 
                        find_ports_in_value_group(&message.message, &mut results.ports);
 
                        Self::acquire_ports_during_sync(&mut self.ports, &mut new_branch, &results.ports);
 
                        results.ports.clear();
 

	
 
@@ -965,49 +965,49 @@ impl ConnectorPDL {
 
                    results.ports.clear();
 

	
 
                    results.outbox.push(MessageContents::Data(message));
 

	
 
                    let branch_index = branch.index;
 
                    Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, branch_index);
 
                    return ConnectorScheduling::Immediate
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in sync mode", run_result),
 
        }
 

	
 
        // Not immediately scheduling, so schedule again if there are more
 
        // branches to run
 
        if self.sync_active.is_empty() {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    /// Runs the connector in non-synchronous mode.
 
    pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, conn_ctx: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(!self.in_sync);
 
        debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty());
 
        debug_assert!(self.branches.len() == 1);
 

	
 
        let branch = &mut self.branches[0];
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = ConnectorRunContext{
 
            branch_index: branch.index.index,
 
            ports: &self.ports,
 
            ports_delta: &branch.ports_delta,
 
            scheduler: sched_ctx,
 
            prepared_channel: branch.prepared_channel.take(),
 
            received: &branch.received,
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => {
 
                // Need to wait until all children are terminated
 
                // TODO: Think about how to do this?
 
                branch.sync_state = SpeculativeState::Finished;
 
                return ConnectorScheduling::Exit;
 
            },
src/runtime2/native.rs
Show inline comments
 
use std::collections::VecDeque;
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::sync::atomic::Ordering;
 

	
 
use crate::protocol::ComponentCreationError;
 
use crate::protocol::eval::ValueGroup;
 
use crate::ProtocolDescription;
 
use crate::runtime2::scheduler::ComponentCtxFancy;
 

	
 
use super::{ConnectorKey, ConnectorId, RuntimeInner, ConnectorCtx};
 
use super::scheduler::SchedulerCtx;
 
use super::port::{Port, PortIdLocal, Channel, PortKind};
 
use super::connector::{Branch, ConnectorScheduling, RunDeltaState, ConnectorPDL};
 
use super::connector::find_ports_in_value_group;
 
use super::inbox::{Message, MessageContents};
 

	
 
/// Generic connector interface from the scheduler's point of view.
 
pub(crate) trait Connector {
 
    /// Handle a new message (preprocessed by the scheduler). You probably only
 
    /// want to handle `Data`, `Sync`, and `Solution` messages. The others are
 
    /// intended for the scheduler itself.
 
    fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState);
 

	
 
    /// Should run the connector's behaviour up until the next blocking point.
 
    fn run(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
}
 

	
 
type SyncDone = Arc<(Mutex<bool>, Condvar)>;
 
type JobQueue = Arc<Mutex<VecDeque<ApplicationJob>>>;
 

	
 
enum ApplicationJob {
 
    NewChannel((Port, Port)),
 
    NewConnector(ConnectorPDL),
 
    Shutdown,
 
}
 

	
 
/// The connector which an application can directly interface with. Once may set
 
/// up the next synchronous round, and retrieve the data afterwards.
 
pub struct ConnectorApplication {
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
}
 

	
 
impl ConnectorApplication {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>) -> (Self, ApplicationInterface) {
 
        let sync_done = Arc::new(( Mutex::new(false), Condvar::new() ));
 
        let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32)));
 

	
 
        let connector = ConnectorApplication { sync_done: sync_done.clone(), job_queue: job_queue.clone() };
 
        let interface = ApplicationInterface::new(sync_done, job_queue, runtime);
 

	
 
        return (connector, interface);
 
    }
 
}
 

	
 
impl Connector for ConnectorApplication {
 
    fn handle_message(&mut self, message: Message, _ctx: &ConnectorCtx, _delta_state: &mut RunDeltaState) {
 
        use MessageContents as MC;
 

	
 
        match message.contents {
 
            MC::Data(_) => unreachable!("data message in API connector"),
 
            MC::Sync(_) | MC::RequestCommit(_) | MC::ConfirmCommit(_) => {
 
                // Handling sync in API
 
            },
 
            MC::Control(_) => {},
 
            MC::Ping => {},
 
        }
 
    }
 

	
 
    fn run(&mut self, _sched_ctx: SchedulerCtx, _conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
    fn run(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, _conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop_front() {
 
            match job {
 
                ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
 
                    println!("DEBUG: API adopting ports");
 
                    delta_state.new_ports.reserve(2);
 
                    delta_state.new_ports.push(endpoint_a);
 
                    delta_state.new_ports.push(endpoint_b);
 
                }
 
                ApplicationJob::NewConnector(connector) => {
 
                    println!("DEBUG: API creating connector");
 
                    delta_state.new_connectors.push(connector);
 
                },
 
                ApplicationJob::Shutdown => {
 
                    debug_assert!(queue.is_empty());
 
                    return ConnectorScheduling::Exit;
 
                }
 
            }
 
        }
 

	
 
        return ConnectorScheduling::NotNow;
 
    }
 
}
 

	
src/runtime2/port.rs
Show inline comments
 
@@ -16,39 +16,40 @@ impl PortIdLocal {
 
        Self{ index: u32::MAX }
 
    }
 

	
 
    pub fn is_valid(&self) -> bool {
 
        return self.index != u32::MAX;
 
    }
 
}
 

	
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum PortState {
 
    Open,
 
    Closed,
 
}
 

	
 
/// Represents a port inside of the runtime. This is generally the local view of
 
/// a connector on its port, which may not be consistent with the rest of the
 
/// global system (e.g. its peer was moved to a new connector, or the peer might
 
/// have died in the meantime, so it is no longer usable).
 
#[derive(Clone)]
 
pub struct Port {
 
    pub self_id: PortIdLocal,
 
    pub peer_id: PortIdLocal,
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    pub peer_connector: ConnectorId, // might be temporarily inconsistent while peer port is sent around in non-sync phase
 
}
 

	
 

	
 

	
 
// TODO: Turn port ID into its own type
 
pub struct Channel {
 
    pub putter_id: PortIdLocal, // can put on it, so from the connector's point of view, this is an output
 
    pub getter_id: PortIdLocal, // vice versa: can get on it, so an input for the connector
 
}
 
\ No newline at end of file
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 
use crate::runtime2::ScheduledConnector;
 
use crate::runtime2::connector::{BranchId, ConnectorPDL};
 
use crate::runtime2::inbox::{DataMessage, PrivateInbox};
 

	
 
use super::{RuntimeInner, ConnectorId, ConnectorKey};
 
use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey};
 
use super::port::{Port, PortState, PortIdLocal};
 
use super::native::Connector;
 
use super::connector::{ConnectorScheduling, RunDeltaState};
 
use super::inbox::{Message, MessageContents, ControlMessageVariant, ControlMessage};
 

	
 
/// Contains fields that are mostly managed by the scheduler, but may be
 
/// accessed by the connector
 
pub(crate) struct ConnectorCtx {
 
    pub(crate) id: ConnectorId,
 
    pub(crate) ports: Vec<Port>,
 
}
 

	
 
impl ConnectorCtx {
 
    pub(crate) fn new() -> ConnectorCtx {
 
        Self{
 
            id: ConnectorId::new_invalid(),
 
            ports: Vec::new(),
 
        }
 
    }
 

	
 
    pub(crate) fn add_port(&mut self, port: Port) {
 
        debug_assert!(!self.ports.iter().any(|v| v.self_id == port.self_id));
 
        self.ports.push(port);
 
    }
 
@@ -345,48 +346,201 @@ impl Scheduler {
 
        // But do to reordering we might have received messages from peers who
 
        // did not consider us sleeping. If so, then we wake ourselves again.
 
        if !connector.public.inbox.is_empty() {
 
            // Try to wake ourselves up
 
            let should_wake_up_again = connector.public.sleeping
 
                .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
                .is_ok();
 

	
 
            if should_wake_up_again {
 
                self.runtime.push_work(connector_key)
 
            }
 
        }
 
    }
 

	
 
    // TODO: Remove, this is debugging stuff
 
    fn debug(&self, message: &str) {
 
        println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
    }
 

	
 
    fn debug_conn(&self, conn: ConnectorId, message: &str) {
 
        println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message);
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ComponentCtx
 
// -----------------------------------------------------------------------------
 

	
 
enum ComponentStateChange {
 
    CreatedComponent(ConnectorPDL),
 
    CreatedPort(Port),
 
    ChangedPort(ComponentPortChange),
 
}
 

	
 
#[derive(Clone)]
 
pub(crate) enum ComponentPortChange {
 
    Acquired(Port),
 
    Released(Port),
 
}
 

	
 
struct InboxMessage {
 
    target_port: PortIdLocal,
 
    data: DataMessage,
 
}
 

	
 
/// The component context (better name may be invented). This was created
 
/// because part of the component's state is managed by the scheduler, and part
 
/// of it by the component itself. When the component starts a sync block or
 
/// exits a sync block the partially managed state by both component and
 
/// scheduler need to be exchanged.
 
pub(crate) struct ComponentCtxFancy {
 
    // Mostly managed by the scheduler
 
    id: ConnectorId,
 
    ports: Vec<Port>,
 
    inbox_messages: Vec<InboxMessage>,
 
    inbox_len_read: usize,
 
    // Submitted by the component
 
    is_in_sync: bool,
 
    changed_in_sync: bool,
 
    outbox: Vec<MessageContents>,
 
    state_changes: Vec<ComponentStateChange>
 
}
 

	
 
impl ComponentCtxFancy {
 
    /// Notify the runtime that the component has created a new component. May
 
    /// only be called outside of a sync block.
 
    pub(crate) fn push_component(&mut self, component: ConnectorPDL) {
 
        debug_assert!(!self.is_in_sync);
 
        self.state_changes.push(ComponentStateChange::CreatedComponent(component));
 
    }
 

	
 
    /// Notify the runtime that the component has created a new port. May only
 
    /// be called outside of a sync block (for ports received during a sync
 
    /// block, pass them when calling `notify_sync_end`).
 
    pub(crate) fn push_port(&mut self, port: Port) {
 
        debug_assert!(!self.is_in_sync);
 
        self.state_changes.push(ComponentStateChange::CreatedPort(port))
 
    }
 

	
 
    /// Notify that component will enter a sync block.
 
    pub(crate) fn notify_sync_start(&mut self) -> &[Port] {
 
        debug_assert!(!self.is_in_sync);
 

	
 
        self.is_in_sync = true;
 
        self.changed_in_sync = true;
 
        return &self.ports
 
    }
 

	
 
    /// Submit a message for the scheduler to send to the appropriate receiver.
 
    /// May only be called inside of a sync block.
 
    pub(crate) fn submit_message(&mut self, contents: MessageContents) {
 
        debug_assert!(self.is_in_sync);
 
        self.outbox.push(contents);
 
    }
 

	
 
    /// Notify that component just finished a sync block.
 
    pub(crate) fn notify_sync_end(&mut self, changed_ports: &[ComponentPortChange]) {
 
        debug_assert!(self.is_in_sync);
 

	
 
        self.is_in_sync = false;
 
        self.changed_in_sync = true;
 

	
 
        self.state_changes.reserve(changed_ports.len());
 
        for changed_port in changed_ports {
 
            self.state_changes.push(ComponentStateChange::ChangedPort(changed_port.clone()));
 
        }
 
    }
 

	
 
    /// Inserts message into inbox. Generally only called by scheduler.
 
    pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, data: DataMessage) {
 
        debug_assert!(!self.inbox_messages.iter().any(|v| {
 
            v.target_port == target_port &&
 
                v.data.sender_prev_branch_id == data.sender_prev_branch_id &&
 
                v.data.sender_cur_branch_id == data.sender_cur_branch_id
 
        }));
 

	
 
        self.inbox_messages.push(InboxMessage{ target_port, data })
 
    }
 

	
 
    /// Retrieves messages matching a particular port and branch id. But only
 
    /// those messages that have been previously received with
 
    /// `read_next_message`.
 
    pub(crate) fn get_read_messages(&self, match_port_id: PortIdLocal, match_prev_branch_id: BranchId) -> MessagesIter {
 
        return MessageIter {
 
            messages: &self.inbox_messages,
 
            next_index: 0,
 
            max_index: self.inbox_len_read,
 
            match_port_id, match_prev_branch_id
 
        };
 
    }
 

	
 
    /// Retrieves the next unread message from the inbox `None` if there are no
 
    /// (new) messages to read.
 
    pub(crate) fn read_next_message(&mut self) -> Option<(&PortIdLocal, &DataMessage)> {
 
        if self.inbox_len_read == self.inbox_messages.len() {
 
            return None;
 
        }
 

	
 
        let message = &self.inbox_messages[self.inbox_len_read];
 
        self.inbox_len_read += 1;
 
        return Some((&message.target_port, &message.data))
 
    }
 
}
 

	
 
pub(crate) struct MessagesIter<'a> {
 
    messages: &'a [InboxMessage],
 
    next_index: usize,
 
    max_index: usize,
 
    match_port_id: PortIdLocal,
 
    match_prev_branch_id: BranchId,
 
}
 

	
 
impl Iterator for MessagesIter {
 
    type Item = DataMessage;
 

	
 
    fn next(&mut self) -> Option<&Self::Item> {
 
        // Loop until match is found or at end of messages
 
        while self.next_index < self.max_index {
 
            let message = &self.messages[self.next_index];
 
            if message.target_port == self.match_port_id && message.data.sender_prev_branch_id == self.match_prev_branch_id {
 
                // Found a match
 
                break;
 
            }
 

	
 
            self.next_index += 1;
 
        }
 

	
 
        if self.next_index == self.max_index {
 
            return None;
 
        }
 

	
 
        let message = &self.messages[self.next_index];
 
        self.next_index += 1;
 
        return Some(&message.data);
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Control messages
 
// -----------------------------------------------------------------------------
 

	
 
struct ControlEntry {
 
    id: u32,
 
    variant: ControlVariant,
 
}
 

	
 
enum ControlVariant {
 
    ChangedPort(ControlChangedPort),
 
    ClosedChannel(ControlClosedChannel),
 
}
 

	
 
struct ControlChangedPort {
 
    target_port: PortIdLocal,       // if send to this port, then reroute
 
    source_connector: ConnectorId,  // connector we expect messages from
 
    target_connector: ConnectorId,  // connector we need to reroute to
 
}
 

	
 
struct ControlClosedChannel {
 
    source_port: PortIdLocal,
 
    target_port: PortIdLocal,
 
}
0 comments (0 inline, 0 general)