Changeset - a2b6b8e94778
[Not reviewed]
0 4 0
MH - 4 years ago 2021-11-07 15:10:22
contact@maxhenger.nl
initial rewrite of component using new ExecTree and Consensus
4 files changed with 245 insertions and 33 deletions:
0 comments (0 inline, 0 general)
src/runtime2/branch.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::ops::{Index, IndexMut};
 

	
 
use crate::protocol::ComponentState;
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::port::PortIdLocal;
 
use crate::protocol::eval::{Value, ValueGroup};
 
use crate::runtime2::port::{Port, PortIdLocal};
 

	
 
/// Generic branch ID. A component will always have one branch: the
 
/// non-speculative branch. This branch has ID 0. Hence in a speculative context
 
/// we use this fact to let branch ID 0 denote the ID being invalid.
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub struct BranchId {
 
@@ -52,13 +52,14 @@ pub(crate) struct Branch {
 
    pub parent_id: BranchId,
 
    // Execution state
 
    pub code_state: ComponentState,
 
    pub sync_state: SpeculativeState,
 
    pub awaiting_port: PortIdLocal, // only valid if in "awaiting message" queue. TODO: Maybe put in enum
 
    pub next_in_queue: BranchId, // used by `ExecTree`/`BranchQueue`
 
    pub inbox: HashMap<PortIdLocal, ValueGroup>; // TODO: Remove, currently only valid in single-get/put mode
 
    pub inbox: HashMap<PortIdLocal, ValueGroup>, // TODO: Remove, currently only valid in single-get/put mode
 
    pub prepared_channel: Option<(Value, Value)>, // TODO: Maybe remove?
 
}
 

	
 
impl Branch {
 
    /// Creates a new non-speculative branch
 
    pub(crate) fn new_non_sync(component_state: ComponentState) -> Self {
 
        Branch {
 
@@ -66,12 +67,13 @@ impl Branch {
 
            parent_id: BranchId::new_invalid(),
 
            code_state: component_state,
 
            sync_state: SpeculativeState::RunningNonSync,
 
            awaiting_port: PortIdLocal::new_invalid(),
 
            next_in_queue: BranchId::new_invalid(),
 
            inbox: HashMap::new(),
 
            prepared_channel: None,
 
        }
 
    }
 

	
 
    /// Constructs a sync branch. The provided branch is assumed to be the
 
    /// parent of the new branch within the execution tree.
 
    fn new_sync(new_index: u32, parent_branch: &Branch) -> Self {
 
@@ -86,12 +88,13 @@ impl Branch {
 
            parent_id: parent_branch.index,
 
            code_state: parent_branch.code_state.clone(),
 
            sync_state: SpeculativeState::RunningInSync,
 
            awaiting_port: parent_branch.awaiting_port,
 
            next_in_queue: BranchId::new_invalid(),
 
            inbox: parent_branch.inbox.clone(),
 
            prepared_channel: None,
 
        }
 
    }
 

	
 
    /// Inserts a message into the branch for retrieval by a corresponding
 
    /// `get(port)` call.
 
    pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, contents: ValueGroup) {
 
@@ -207,12 +210,18 @@ impl ExecTree {
 
            let last_branch = &mut self.branches[queue.last.index as usize];
 
            last_branch.next_in_queue = id;
 
            queue.last = id;
 
        }
 
    }
 

	
 
    /// Returns the non-sync branch (TODO: better name?)
 
    pub fn base_branch_mut(&mut self) -> &mut Branch {
 
        debug_assert!(!self.is_in_sync());
 
        return &mut self.branches[0];
 
    }
 

	
 
    /// Returns an iterator over all the elements in the queue of the given kind
 
    pub fn iter_queue(&self, kind: QueueKind) -> BranchQueueIter {
 
        let queue = &self.queues[kind.as_index()];
 
        let index = queue.first as usize;
 
        return BranchQueueIter {
 
            branches: self.branches.as_slice(),
 
@@ -229,19 +238,21 @@ impl ExecTree {
 
        }
 
    }
 

	
 
    // --- Preparing and finishing a speculative round
 

	
 
    /// Starts a synchronous round by cloning the non-sync branch and marking it
 
    /// as the root of the speculative tree.
 
    pub fn start_sync(&mut self) {
 
    /// as the root of the speculative tree. The id of this root sync branch is
 
    /// returned.
 
    pub fn start_sync(&mut self) -> BranchId {
 
        debug_assert!(!self.is_in_sync());
 
        let sync_branch = Branch::new_sync(1, &self.branches[0]);
 
        let sync_branch_id = sync_branch.id;
 
        self.branches.push(sync_branch);
 
        self.push_into_queue(QueueKind::Runnable, sync_branch_id);
 

	
 
        return sync_branch_id;
 
    }
 

	
 
    /// Creates a new speculative branch based on the provided one. The index to
 
    /// retrieve this new branch will be returned.
 
    pub fn fork_branch(&mut self, parent_branch_id: BranchId) -> BranchId {
 
        debug_assert!(self.is_in_sync());
src/runtime2/connector2.rs
Show inline comments
 
/// connector.rs
 
///
 
/// Represents a component. A component (and the scheduler that is running it)
 
/// has many properties that are not easy to subdivide into aspects that are
 
/// conceptually handled by particular data structures. That is to say: the code
 
/// that we run governs: running PDL code, keeping track of ports, instantiating
 
/// new components and transports (i.e. interacting with the runtime), running
 
/// a consensus algorithm, etc. But on the other hand, our data is rather
 
/// simple: we have a speculative execution tree, a set of ports that we own,
 
/// and a bit of code that we should run.
 
///
 
/// So currently the code is organized as following:
 
/// - The scheduler that is running the component is the authoritative source on
 
///     ports during *non-sync* mode. The consensus algorithm is the
 
///     authoritative source during *sync* mode. They retrieve each other's
 
///     state during the transitions. Hence port data exists duplicated between
 
///     these two datastructures.
 
/// - The execution tree is where executed branches reside. But the execution
 
///     tree is only aware of the tree shape itself (and keeps track of some
 
///     queues of branches that are in a particular state), and tends to store
 
///     the PDL program state. The consensus algorithm is also somewhat aware
 
///     of the execution tree, but only in terms of what is needed to complete
 
///     a sync round (for now, that means the port mapping in each branch).
 
///     Hence once more we have properties conceptually associated with branches
 
///     in two places.
 
/// - TODO: Write about handling messages, consensus wrapping data
 
/// - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx
 

	
 
use std::sync::atomic::AtomicBool;
 
use crate::common::ComponentState;
 

	
 
use crate::PortId;
 
use crate::protocol::eval::{Value, ValueGroup};
 
use crate::common::ComponentState;
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::protocol::{RunContext, RunResult};
 
use crate::runtime2::branch::{Branch, BranchId, ExecTree, QueueKind, SpeculativeState};
 
use crate::runtime2::connector::ConnectorScheduling;
 
use crate::runtime2::consensus::{Consensus, Consistency};
 
use crate::runtime2::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy};
 
use crate::runtime2::inbox::PublicInbox;
 
use crate::runtime2::native::Connector;
 
use crate::runtime2::port::PortIdLocal;
 
use crate::runtime2::scheduler::{ComponentCtxFancy, SchedulerCtx};
 
use crate::runtime2::consensus::find_ports_in_value_group;
 
use crate::runtime2::port::PortKind;
 

	
 
use super::branch::{Branch, BranchId, ExecTree, QueueKind, SpeculativeState};
 
use super::consensus::{Consensus, Consistency};
 
use super::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy};
 
use super::inbox::PublicInbox;
 
use super::native::Connector;
 
use super::port::PortIdLocal;
 
use super::scheduler::{ComponentCtxFancy, SchedulerCtx};
 

	
 
pub(crate) struct ConnectorPublic {
 
    pub inbox: PublicInbox,
 
    pub sleeping: AtomicBool,
 
}
 

	
 
@@ -23,19 +54,26 @@ impl ConnectorPublic {
 
            inbox: PublicInbox::new(),
 
            sleeping: AtomicBool::new(initialize_as_sleeping),
 
        }
 
    }
 
}
 

	
 
#[derive(Eq, PartialEq)]
 
pub(crate) enum ConnectorScheduling {
 
    Immediate,      // Run again, immediately
 
    Later,          // Schedule for running, at some later point in time
 
    NotNow,         // Do not reschedule for running
 
    Exit,          // Connector has exited
 
}
 

	
 
pub(crate) struct ConnectorPDL {
 
    tree: ExecTree,
 
    consensus: Consensus,
 
    branch_workspace: Vec<BranchId>,
 
}
 

	
 
struct ConnectorRunContext {};
 
struct ConnectorRunContext {}
 
impl RunContext for ConnectorRunContext{
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        todo!()
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
@@ -77,17 +115,17 @@ impl ConnectorPDL {
 
        }
 
    }
 

	
 
    pub fn handle_new_data_message(&mut self, message: DataMessageFancy, ctx: &mut ComponentCtxFancy) {
 
        // Go through all branches that are awaiting new messages and see if
 
        // there is one that can receive this message.
 
        debug_assert!(self.branch_workspace.is_empty());
 
        debug_assert!(ctx.workspace_branches.is_empty());
 
        self.consensus.handle_received_sync_header(&message.sync_header, ctx);
 
        self.consensus.handle_received_data_header(&self.tree, &message.data_header, &mut self.branch_workspace);
 
        self.consensus.handle_received_data_header(&self.tree, &message.data_header, &mut ctx.workspace_branches);
 

	
 
        for branch_id in self.branch_workspace.drain(..) {
 
        for branch_id in ctx.workspace_branches.drain(..) {
 
            // This branch can receive, so fork and given it the message
 
            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            receiving_branch.insert_message(message.data_header.target_port, message.content.clone());
 
@@ -104,12 +142,13 @@ impl ConnectorPDL {
 
    }
 

	
 
    // --- Running code
 

	
 
    pub fn run_in_sync_mode(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
 
        // Check if we have any branch that needs running
 
        debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync());
 
        let branch_id = self.tree.pop_from_queue(QueueKind::Runnable);
 
        if branch_id.is_none() {
 
            return ConnectorScheduling::NotNow;
 
        }
 

	
 
        // Retrieve the branch and run it
 
@@ -172,13 +211,13 @@ impl ConnectorPDL {
 
                            // fork-and-receive dance
 
                            let recv_branch_id = self.tree.fork_branch(branch_id);
 
                            let branch = &mut self.tree[recv_branch_id];
 
                            branch.insert_message(port_id, message.content.clone());
 

	
 
                            self.consensus.notify_of_new_branch(branch_id, recv_branch_id);
 
                            self.consensus.notify_of_received_message(recv_branch_id, &message.data_header);
 
                            self.consensus.notify_of_received_message(recv_branch_id, &message.data_header, &message.content);
 
                            self.tree.push_into_queue(QueueKind::Runnable, recv_branch_id);
 

	
 
                            any_branch_received = true;
 
                        }
 
                    }
 

	
 
@@ -195,19 +234,25 @@ impl ConnectorPDL {
 
                    branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                    self.tree.push_into_queue(QueueKind::FinishedSync, branch_id);
 
                } else if consistency == Consistency::Inconsistent {
 
                    branch.sync_state == SpeculativeState::Inconsistent;
 
                }
 
            },
 
            RunResult::BranchPut(port_id, contents) => {
 
            RunResult::BranchPut(port_id, content) => {
 
                // Branch is attempting to send data
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
 
                if consistency == Consistency::Valid {
 
                    // `put()` is valid.
 
                    self.consensus.
 
                    let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                    comp_ctx.submit_message(MessageFancy::Data(DataMessageFancy{
 
                        sync_header, data_header, content
 
                    }));
 

	
 
                    self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                    return ConnectorScheduling::Immediate;
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            _ => unreachable!("unexpected run result {:?} in sync mode", run_result),
 
        }
 
@@ -217,7 +262,66 @@ impl ConnectorPDL {
 
        if self.tree.queue_is_empty(QueueKind::Runnable) {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    pub fn run_in_deterministic_mode(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
 
        debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync());
 

	
 
        let branch = self.tree.base_branch_mut();
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = ConnectorRunContext{};
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => {
 
                branch.sync_state = SpeculativeState::Finished;
 

	
 
                return ConnectorScheduling::Exit;
 
            },
 
            RunResult::ComponentAtSyncStart => {
 
                let current_ports = comp_ctx.notify_sync_start();
 
                let sync_branch_id = self.tree.start_sync();
 
                self.consensus.start_sync(current_ports);
 
                self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                // Note: we're relinquishing ownership of ports. But because
 
                // we are in non-sync mode the scheduler will handle and check
 
                // port ownership transfer.
 
                debug_assert!(comp_ctx.workspace_ports.is_empty());
 
                find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports);
 

	
 
                let new_state = ComponentState {
 
                    prompt: Prompt::new(
 
                        &sched_ctx.runtime.protocol_description.types,
 
                        &sched_ctx.runtime.protocol_description.heap,
 
                        definition_id, monomorph_idx, arguments
 
                    ),
 
                };
 
                let new_component = ConnectorPDL::new(new_state, comp_ctx.workspace_ports.clone());
 
                comp_ctx.push_component(new_component);
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
            RunResult::NewChannel => {
 
                let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id);
 
                debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter);
 
                branch.prepared_channel = Some((
 
                    Value::Input(PortId::new(putter.self_id.index)),
 
                    Value::Output(PortId::new(getter.self_id.index)),
 
                ));
 

	
 
                comp_ctx.push_port(putter);
 
                comp_ctx.push_port(getter);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result),
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/consensus.rs
Show inline comments
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::branch::{BranchId, ExecTree, QueueKind};
 
use crate::runtime2::ConnectorId;
 
use crate::runtime2::inbox2::{DataHeader, SyncHeader};
 
use crate::runtime2::port::PortIdLocal;
 
use crate::runtime2::port::{Port, PortIdLocal};
 
use crate::runtime2::scheduler::ComponentCtxFancy;
 
use super::inbox2::PortAnnotation;
 

	
 
struct BranchAnnotation {
 
    port_mapping: Vec<PortAnnotation>,
 
}
 
@@ -14,15 +14,18 @@ struct BranchAnnotation {
 
/// The consensus algorithm. Currently only implemented to find the component
 
/// with the highest ID within the sync region and letting it handle all the
 
/// local solutions.
 
///
 
/// The type itself serves as an experiment to see how code should be organized.
 
// TODO: Flatten all datastructures
 
// TODO: Have a "branch+port position hint" in case multiple operations are
 
//  performed on the same port to prevent repeated lookups
 
pub(crate) struct Consensus {
 
    highest_connector_id: ConnectorId,
 
    branch_annotations: Vec<BranchAnnotation>,
 
    workspace_ports: Vec<PortIdLocal>,
 
}
 

	
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) enum Consistency {
 
    Valid,
 
    Inconsistent,
 
@@ -35,25 +38,30 @@ impl Consensus {
 
            branch_annotations: Vec::new(),
 
        }
 
    }
 

	
 
    // --- Controlling sync round and branches
 

	
 
    /// Returns whether the consensus algorithm is running in sync mode
 
    pub fn is_in_sync(&self) -> bool {
 
        return !self.branch_annotations.is_empty();
 
    }
 

	
 
    /// Sets up the consensus algorithm for a new synchronous round. The
 
    /// provided ports should be the ports the component owns at the start of
 
    /// the sync round.
 
    pub fn start_sync(&mut self, ports: &[PortIdLocal]) {
 
    pub fn start_sync(&mut self, ports: &[Port]) {
 
        debug_assert!(self.branch_annotations.is_empty());
 
        debug_assert!(!self.highest_connector_id.is_valid());
 

	
 
        // We'll use the first "branch" (the non-sync one) to store our ports,
 
        // this allows cloning if we created a new branch.
 
        self.branch_annotations.push(BranchAnnotation{
 
            port_mapping: ports.iter()
 
                .map(|v| PortAnnotation{
 
                    port_id: *v,
 
                    port_id: v.self_id,
 
                    registered_id: None,
 
                    expected_firing: None,
 
                })
 
                .collect(),
 
        });
 
    }
 
@@ -124,22 +132,54 @@ impl Consensus {
 
    }
 

	
 
    // --- Handling messages
 

	
 
    /// Prepares a message for sending. Caller should have made sure that
 
    /// sending the message is consistent with the speculative state.
 
    pub fn prepare_message(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, value: &ValueGroup) -> (SyncHeader, DataHeader) {
 
    pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtxFancy) -> (SyncHeader, DataHeader) {
 
        debug_assert!(self.is_in_sync());
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 

	
 
        if cfg!(debug_assertions) {
 
            let branch = &self.branch_annotations[branch_id.index as usize];
 
            let port = branch.port_mapping.iter()
 
                .find(|v| v.port_id == source_port_id)
 
                .unwrap();
 
            debug_assert!(port.expected_firing == None || port.expected_firing == Some(true));
 
        }
 

	
 
        
 
        // Check for ports that are begin sent
 
        debug_assert!(self.workspace_ports.is_empty());
 
        find_ports_in_value_group(content, &mut self.workspace_ports);
 
        if !self.workspace_ports.is_empty() {
 
            todo!("handle sending ports");
 
            self.workspace_ports.clear();
 
        }
 

	
 
        let sync_header = SyncHeader{
 
            sending_component_id: ctx.id,
 
            highest_component_id: self.highest_connector_id,
 
        };
 

	
 
        // TODO: Handle multiple firings. Right now we just assign the current
 
        //  branch to the `None` value because we know we can only send once.
 
        debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none());
 
        let port_info = ctx.get_port_by_id(source_port_id).unwrap();
 
        let data_header = DataHeader{
 
            expected_mapping: branch.port_mapping.clone(),
 
            target_port: port_info.peer_id,
 
            new_mapping: branch_id
 
        };
 

	
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == source_port_id {
 
                mapping.expected_firing = Some(true);
 
                mapping.registered_id = Some(branch_id);
 
            }
 
        }
 

	
 
        return (sync_header, data_header);
 
    }
 

	
 
    pub fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) {
 
        todo!("should check IDs and maybe send sync messages");
 
    }
 

	
 
@@ -158,18 +198,28 @@ impl Consensus {
 
                    target_ids.push(branch.id);
 
                }
 
            }
 
        }
 
    }
 

	
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader) {
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &ValueGroup) {
 
        debug_assert!(self.branch_can_receive(branch_id, data_header));
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == data_header.target_port {
 
                // Found the port in which the message should be inserted
 
                mapping.registered_id = Some(data_header.new_mapping);
 

	
 
                // Check for sent ports
 
                debug_assert!(self.workspace_ports.is_empty());
 
                find_ports_in_value_group(content, &mut self.workspace_ports);
 
                if !self.workspace_ports.is_empty() {
 
                    todo!("handle received ports");
 
                    self.workspace_ports.clear();
 
                }
 

	
 
                return;
 
            }
 
        }
 

	
 
        // If here, then the branch didn't actually own the port? Means the
 
        // caller made a mistake
 
@@ -193,7 +243,50 @@ impl Consensus {
 
                }
 
            }
 
        }
 

	
 
        return true;
 
    }
 
}
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortIdLocal>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    use crate::protocol::eval::Value;
 

	
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortIdLocal>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                let cur_port = PortIdLocal::new(port_id.0.u32_suffix);
 
                for prev_port in ports.iter() {
 
                    if *prev_port == cur_port {
 
                        // Already added
 
                        return;
 
                    }
 
                }
 

	
 
                ports.push(cur_port);
 
            },
 
            Value::Array(heap_pos) |
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
 
                // Reference to some dynamic thing which might contain ports,
 
                // so recurse
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for embedded_value in heap_region {
 
                    find_port_in_value(group, embedded_value, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 

	
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for value in &value_group.values {
 
        find_port_in_value(value_group, value, ports);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/scheduler.rs
Show inline comments
 
@@ -408,14 +408,18 @@ pub(crate) struct ComponentCtxFancy {
 
    ports: Vec<Port>,
 
    inbox_messages: Vec<MessageFancy>, // never control or ping messages
 
    inbox_len_read: usize,
 
    // Submitted by the component
 
    is_in_sync: bool,
 
    changed_in_sync: bool,
 
    outbox: VecDeque<MessageContents>,
 
    outbox: VecDeque<MessageFancy>,
 
    state_changes: VecDeque<ComponentStateChange>
 
    // Workspaces that may be used by components to (generally) prevent
 
    // allocations. Be a good scout and leave it empty after you've used it.
 
    pub workspace_ports: Vec<PortIdLocal>,
 
    pub workspace_branches: Vec<BranchId>,
 
}
 

	
 
pub(crate) enum ReceivedMessage {
 
    Data((PortIdLocal, DataMessage)),
 
    Sync(SyncMessage),
 
    RequestCommit(SolutionMessage),
 
@@ -475,13 +479,13 @@ impl ComponentCtxFancy {
 
    pub(crate) fn is_in_sync(&self) -> bool {
 
        return self.is_in_sync;
 
    }
 

	
 
    /// Submit a message for the scheduler to send to the appropriate receiver.
 
    /// May only be called inside of a sync block.
 
    pub(crate) fn submit_message(&mut self, contents: MessageContents) {
 
    pub(crate) fn submit_message(&mut self, contents: MessageFancy) {
 
        debug_assert!(self.is_in_sync);
 
        self.outbox.push_back(contents);
 
    }
 

	
 
    /// Notify that component just finished a sync block. Like
 
    /// `notify_sync_start`: drop out of the `Component::Run` function.
0 comments (0 inline, 0 general)