Changeset - 088be7630245
[Not reviewed]
0 9 0
mh - 4 years ago 2021-11-14 15:34:10
contact@maxhenger.nl
Implement multi-put and multi-get
9 files changed with 206 insertions and 173 deletions:
0 comments (0 inline, 0 general)
src/protocol/eval/executor.rs
Show inline comments
 
@@ -585,7 +585,7 @@ impl Prompt {
 
                                        unreachable!("executor calling 'get' on value {:?}", value)
 
                                    };
 

	
 
                                    match ctx.get(port_id) {
 
                                    match ctx.performed_get(port_id) {
 
                                        Some(result) => {
 
                                            // We have the result. Merge the `ValueGroup` with the
 
                                            // stack/heap storage.
 
@@ -614,11 +614,12 @@ impl Prompt {
 
                                    let msg_value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let deref_msg_value = self.store.maybe_read_ref(&msg_value).clone();
 

	
 
                                    if ctx.did_put(port_id) {
 
                                    if ctx.performed_put(port_id) {
 
                                        // We're fine, deallocate in case the expression value stack
 
                                        // held an owned value
 
                                        self.store.drop_value(msg_value.get_heap_pos());
 
                                    } else {
 
                                        // Prepare to execute again
 
                                        cur_frame.expr_values.push_front(msg_value);
 
                                        cur_frame.expr_values.push_front(port_value);
 
                                        cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id));
 
@@ -811,7 +812,7 @@ impl Prompt {
 
                    LocalStatement::Channel(stmt) => {
 
                        // Need to create a new channel by requesting it from
 
                        // the runtime.
 
                        match ctx.get_channel() {
 
                        match ctx.created_channel() {
 
                            None => {
 
                                // No channel is pending. So request one
 
                                Ok(EvalContinuation::NewChannel)
 
@@ -893,7 +894,7 @@ impl Prompt {
 
                    cur_frame.position = stmt.left_body.upcast();
 
                } else {
 
                    // Need to fork
 
                    if let Some(go_left) = ctx.get_fork() {
 
                    if let Some(go_left) = ctx.performed_fork() {
 
                        // Runtime has created a fork
 
                        if go_left {
 
                            cur_frame.position = stmt.left_body.upcast();
src/protocol/mod.rs
Show inline comments
 
@@ -284,11 +284,11 @@ impl ProtocolDescription {
 

	
 
// TODO: @temp Should just become a concrete thing that is passed in
 
pub trait RunContext {
 
    fn did_put(&mut self, port: PortId) -> bool;
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup>; // None if still waiting on message
 
    fn performed_put(&mut self, port: PortId) -> bool;
 
    fn performed_get(&mut self, port: PortId) -> Option<ValueGroup>; // None if still waiting on message
 
    fn fires(&mut self, port: PortId) -> Option<Value>; // None if not yet branched
 
    fn get_fork(&mut self) -> Option<bool>; // None if not yet forked
 
    fn get_channel(&mut self) -> Option<(Value, Value)>; // None if not yet prepared
 
    fn performed_fork(&mut self) -> Option<bool>; // None if not yet forked
 
    fn created_channel(&mut self) -> Option<(Value, Value)>; // None if not yet prepared
 
}
 

	
 
#[derive(Debug)]
 
@@ -301,7 +301,7 @@ pub enum RunResult {
 
    // Can only occur inside sync blocks
 
    BranchInconsistent, // branch has inconsistent behaviour
 
    BranchMissingPortState(PortId), // branch doesn't know about port firing
 
    BranchMissingPortValue(PortId), // branch hasn't received message on input port yet
 
    BranchGet(PortId), // branch hasn't received message on input port yet
 
    BranchAtSyncEnd,
 
    BranchFork,
 
    BranchPut(PortId, ValueGroup),
 
@@ -334,7 +334,7 @@ impl ComponentState {
 
                    EC::NewFork =>
 
                        return RR::BranchFork,
 
                    EC::BlockFires(port_id) => return RR::BranchMissingPortState(port_id),
 
                    EC::BlockGet(port_id) => return RR::BranchMissingPortValue(port_id),
 
                    EC::BlockGet(port_id) => return RR::BranchGet(port_id),
 
                    EC::Put(port_id, value) => {
 
                        let value_group = ValueGroup::from_store(&self.prompt.store, &[value]);
 
                        return RR::BranchPut(port_id, value_group);
 
@@ -469,7 +469,7 @@ impl ComponentState {
 
}
 

	
 
impl RunContext for EvalContext<'_> {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
    fn performed_put(&mut self, port: PortId) -> bool {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(_) => unreachable!(),
 
@@ -479,7 +479,7 @@ impl RunContext for EvalContext<'_> {
 
        }
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
    fn performed_get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(_) => unreachable!(),
 
@@ -518,7 +518,7 @@ impl RunContext for EvalContext<'_> {
 
        }
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
    fn created_channel(&mut self) -> Option<(Value, Value)> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(context) => {
 
@@ -531,7 +531,7 @@ impl RunContext for EvalContext<'_> {
 
        }
 
    }
 

	
 
    fn get_fork(&mut self) -> Option<bool> {
 
    fn performed_fork(&mut self) -> Option<bool> {
 
        // Never actually used in the old runtime
 
        return None;
 
    }
src/runtime2/branch.rs
Show inline comments
 
@@ -52,6 +52,35 @@ pub(crate) enum SpeculativeState {
 
    Inconsistent,           // branch can never represent a local solution, so halted
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum PreparedStatement {
 
    CreatedChannel((Value, Value)),
 
    ForkedExecution(bool),
 
    PerformedPut,
 
    PerformedGet(ValueGroup),
 
    None,
 
}
 

	
 
impl PreparedStatement {
 
    pub(crate) fn is_none(&self) -> bool {
 
        if let PreparedStatement::None = self {
 
            return true;
 
        } else {
 
            return false;
 
        }
 
    }
 

	
 
    pub(crate) fn take(&mut self) -> PreparedStatement {
 
        if let PreparedStatement::None = self {
 
            return PreparedStatement::None;
 
        } else {
 
            let mut replacement = PreparedStatement::None;
 
            std::mem::swap(self, &mut replacement);
 
            return replacement;
 
        }
 
    }
 
}
 

	
 
/// The execution state of a branch. This envelops the PDL code and the
 
/// execution state. And derived from that: if we're ready to keep running the
 
/// code, or if we're halted for some reason (e.g. waiting for a message).
 
@@ -63,9 +92,7 @@ pub(crate) struct Branch {
 
    pub sync_state: SpeculativeState,
 
    pub awaiting_port: PortIdLocal, // only valid if in "awaiting message" queue. TODO: Maybe put in enum
 
    pub next_in_queue: BranchId, // used by `ExecTree`/`BranchQueue`
 
    pub inbox: HashMap<PortIdLocal, ValueGroup>, // TODO: Remove, currently only valid in single-get/put mode
 
    pub prepared_channel: Option<(Value, Value)>, // TODO: Maybe remove?
 
    pub prepared_fork: Option<bool>, // TODO: See above
 
    pub prepared: PreparedStatement,
 
}
 

	
 
impl BranchListItem for Branch {
 
@@ -84,9 +111,7 @@ impl Branch {
 
            sync_state: SpeculativeState::RunningNonSync,
 
            awaiting_port: PortIdLocal::new_invalid(),
 
            next_in_queue: BranchId::new_invalid(),
 
            inbox: HashMap::new(),
 
            prepared_channel: None,
 
            prepared_fork: None,
 
            prepared: PreparedStatement::None,
 
        }
 
    }
 

	
 
@@ -97,8 +122,7 @@ impl Branch {
 
        //     (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_id.is_valid()) ||
 
        //     (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        // ); // forking from non-sync, or forking from a branching point
 
        debug_assert!(parent_branch.prepared_channel.is_none());
 
        debug_assert!(parent_branch.prepared_fork.is_none());
 
        debug_assert!(parent_branch.prepared.is_none());
 

	
 
        Branch {
 
            id: BranchId::new(new_index),
 
@@ -107,20 +131,9 @@ impl Branch {
 
            sync_state: SpeculativeState::RunningInSync,
 
            awaiting_port: parent_branch.awaiting_port,
 
            next_in_queue: BranchId::new_invalid(),
 
            inbox: parent_branch.inbox.clone(),
 
            prepared_channel: None,
 
            prepared_fork: None,
 
            prepared: PreparedStatement::None,
 
        }
 
    }
 

	
 
    /// Inserts a message into the branch for retrieval by a corresponding
 
    /// `get(port)` call.
 
    pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, contents: ValueGroup) {
 
        debug_assert!(target_port.is_valid());
 
        debug_assert!(self.awaiting_port == target_port);
 
        self.awaiting_port = PortIdLocal::new_invalid();
 
        self.inbox.insert(target_port, contents);
 
    }
 
}
 

	
 
/// Queue of branches. Just a little helper.
 
@@ -295,8 +308,7 @@ impl ExecTree {
 
        branch.sync_state = SpeculativeState::RunningNonSync;
 
        debug_assert!(!branch.awaiting_port.is_valid());
 
        branch.next_in_queue = BranchId::new_invalid();
 
        branch.inbox.clear();
 
        debug_assert!(branch.prepared_channel.is_none());
 
        debug_assert!(branch.prepared.is_none());
 

	
 
        // Clear out all the queues
 
        for queue_idx in 0..NUM_QUEUES {
src/runtime2/connector.rs
Show inline comments
 
@@ -33,6 +33,7 @@ use crate::PortId;
 
use crate::common::ComponentState;
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::protocol::{RunContext, RunResult};
 
use crate::runtime2::branch::PreparedStatement;
 

	
 
use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState};
 
use super::consensus::{Consensus, Consistency, find_ports_in_value_group};
 
@@ -69,28 +70,28 @@ pub(crate) struct ConnectorPDL {
 
    last_finished_handled: Option<BranchId>,
 
}
 

	
 
// TODO: Remove remaining fields once 'fires()' is removed from language.
 
struct ConnectorRunContext<'a> {
 
    branch_id: BranchId,
 
    consensus: &'a Consensus,
 
    received: &'a HashMap<PortIdLocal, ValueGroup>,
 
    scheduler: SchedulerCtx<'a>,
 
    prepared_channel: Option<(Value, Value)>,
 
    prepared_fork: Option<bool>,
 
    prepared: PreparedStatement,
 
}
 

	
 
impl<'a> RunContext for ConnectorRunContext<'a>{
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
 
        return annotation.registered_id.is_some();
 
    fn performed_put(&mut self, _port: PortId) -> bool {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => false,
 
            PreparedStatement::PerformedPut => true,
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_put()'", taken)
 
        };
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        match self.received.get(&port_id) {
 
            Some(data) => Some(data.clone()),
 
            None => None,
 
        }
 
    fn performed_get(&mut self, _port: PortId) -> Option<ValueGroup> {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => None,
 
            PreparedStatement::PerformedGet(value) => Some(value),
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_get()'", taken),
 
        };
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
@@ -99,12 +100,20 @@ impl<'a> RunContext for ConnectorRunContext<'a>{
 
        return annotation.expected_firing.map(|v| Value::Bool(v));
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        return self.prepared_channel.take();
 
    fn created_channel(&mut self) -> Option<(Value, Value)> {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => None,
 
            PreparedStatement::CreatedChannel(ports) => Some(ports),
 
            taken => unreachable!("prepared statement is '{:?}' during 'created_channel)_'", taken),
 
        };
 
    }
 

	
 
    fn get_fork(&mut self) -> Option<bool> {
 
        return self.prepared_fork.take();
 
    fn performed_fork(&mut self) -> Option<bool> {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => None,
 
            PreparedStatement::ForkedExecution(path) => Some(path),
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_fork()'", taken),
 
        };
 
    }
 
}
 

	
 
@@ -181,7 +190,9 @@ impl ConnectorPDL {
 
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone());
 
            debug_assert!(receiving_branch.awaiting_port == message.data_header.target_port);
 
            receiving_branch.awaiting_port = PortIdLocal::new_invalid();
 
            receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message);
 

	
 
            // And prepare the branch for running
 
@@ -212,10 +223,7 @@ impl ConnectorPDL {
 
        let mut run_context = ConnectorRunContext{
 
            branch_id,
 
            consensus: &self.consensus,
 
            received: &branch.inbox,
 
            scheduler: sched_ctx,
 
            prepared_channel: branch.prepared_channel.take(),
 
            prepared_fork: branch.prepared_fork.take(),
 
            prepared: branch.prepared.take(),
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
@@ -251,43 +259,38 @@ impl ConnectorPDL {
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            RunResult::BranchMissingPortValue(port_id) => {
 
            RunResult::BranchGet(port_id) => {
 
                // Branch performed a `get()` on a port that does not have a
 
                // received message on that port.
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
 
                if consistency == Consistency::Valid {
 
                    // `get()` is valid, so mark the branch as awaiting a message
 
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                    branch.awaiting_port = port_id;
 
                    self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);
 

	
 
                    // Note: we only know that a branch is waiting on a message when
 
                    // it reaches the `get` call. But we might have already received
 
                    // a message that targets this branch, so check now.
 
                    let mut any_message_received = false;
 
                    for message in comp_ctx.get_read_data_messages(port_id) {
 
                        if self.consensus.branch_can_receive(branch_id, &message) {
 
                            // This branch can receive the message, so we do the
 
                            // fork-and-receive dance
 
                            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
                            let branch = &mut self.tree[receiving_branch_id];
 

	
 
                            branch.insert_message(port_id, message.content.as_message().unwrap().clone());
 

	
 
                            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                            self.consensus.notify_of_received_message(receiving_branch_id, &message);
 
                            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                            any_message_received = true;
 
                        }
 
                    }
 

	
 
                    if any_message_received {
 
                        return ConnectorScheduling::Immediate;
 
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                branch.awaiting_port = port_id;
 
                self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);
 

	
 
                // Note: we only know that a branch is waiting on a message when
 
                // it reaches the `get` call. But we might have already received
 
                // a message that targets this branch, so check now.
 
                let mut any_message_received = false;
 
                for message in comp_ctx.get_read_data_messages(port_id) {
 
                    if self.consensus.branch_can_receive(branch_id, &message) {
 
                        // This branch can receive the message, so we do the
 
                        // fork-and-receive dance
 
                        let receiving_branch_id = self.tree.fork_branch(branch_id);
 
                        let branch = &mut self.tree[receiving_branch_id];
 
                        branch.awaiting_port = PortIdLocal::new_invalid();
 
                        branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone());
 

	
 
                        self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                        self.consensus.notify_of_received_message(receiving_branch_id, &message);
 
                        self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                        any_message_received = true;
 
                    }
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 

	
 
                if any_message_received {
 
                    return ConnectorScheduling::Immediate;
 
                }
 
            }
 
            RunResult::BranchAtSyncEnd => {
 
@@ -310,27 +313,22 @@ impl ConnectorPDL {
 
                self.tree.push_into_queue(QueueKind::Runnable, right_id);
 

	
 
                let left_branch = &mut self.tree[left_id];
 
                left_branch.prepared_fork = Some(true);
 
                left_branch.prepared = PreparedStatement::ForkedExecution(true);
 
                let right_branch = &mut self.tree[right_id];
 
                right_branch.prepared_fork = Some(false);
 
                right_branch.prepared = PreparedStatement::ForkedExecution(false);
 
            }
 
            RunResult::BranchPut(port_id, content) => {
 
                // Branch is attempting to send data
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
 
                if consistency == Consistency::Valid {
 
                    // `put()` is valid.
 
                    let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                    comp_ctx.submit_message(Message::Data(DataMessage {
 
                        sync_header, data_header,
 
                        content: DataContent::Message(content),
 
                    }));
 

	
 
                    self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                    return ConnectorScheduling::Immediate;
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
                let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                comp_ctx.submit_message(Message::Data(DataMessage {
 
                    sync_header, data_header,
 
                    content: DataContent::Message(content),
 
                }));
 

	
 
                branch.prepared = PreparedStatement::PerformedPut;
 
                self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                return ConnectorScheduling::Immediate;
 
            },
 
            _ => unreachable!("unexpected run result {:?} in sync mode", run_result),
 
        }
 
@@ -353,10 +351,7 @@ impl ConnectorPDL {
 
        let mut run_context = ConnectorRunContext{
 
            branch_id: branch.id,
 
            consensus: &self.consensus,
 
            received: &branch.inbox,
 
            scheduler: sched_ctx,
 
            prepared_channel: branch.prepared_channel.take(),
 
            prepared_fork: branch.prepared_fork.take(),
 
            prepared: branch.prepared.take(),
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
@@ -399,7 +394,7 @@ impl ConnectorPDL {
 
            RunResult::NewChannel => {
 
                let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id);
 
                debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter);
 
                branch.prepared_channel = Some((
 
                branch.prepared = PreparedStatement::CreatedChannel((
 
                    Value::Output(PortId::new(putter.self_id.index)),
 
                    Value::Input(PortId::new(getter.self_id.index)),
 
                ));
src/runtime2/consensus.rs
Show inline comments
 
use crate::collections::VecSet;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::inbox::BranchMarker;
 

	
 
use super::ConnectorId;
 
use super::branch::BranchId;
 
@@ -14,19 +15,20 @@ use super::scheduler::ComponentCtx;
 

	
 
struct BranchAnnotation {
 
    port_mapping: Vec<PortAnnotation>,
 
    cur_marker: BranchMarker,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) struct LocalSolution {
 
    component: ConnectorId,
 
    final_branch_id: BranchId,
 
    port_mapping: Vec<(ChannelId, BranchId)>,
 
    port_mapping: Vec<(ChannelId, BranchMarker)>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct GlobalSolution {
 
    component_branches: Vec<(ConnectorId, BranchId)>,
 
    channel_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info
 
    channel_mapping: Vec<(ChannelId, BranchMarker)>, // TODO: This can go, is debugging info
 
}
 

	
 
// -----------------------------------------------------------------------------
 
@@ -54,7 +56,8 @@ pub(crate) struct Consensus {
 
    // --- State that is cleared after each round
 
    // Local component's state
 
    highest_connector_id: ConnectorId,
 
    branch_annotations: Vec<BranchAnnotation>,
 
    branch_annotations: Vec<BranchAnnotation>, // index is branch ID
 
    branch_markers: Vec<BranchId>, // index is branch marker, maps to branch
 
    // Gathered state from communication
 
    encountered_ports: VecSet<PortIdLocal>, // to determine if we should send "port remains silent" messages.
 
    solution_combiner: SolutionCombiner,
 
@@ -76,6 +79,7 @@ impl Consensus {
 
        return Self {
 
            highest_connector_id: ConnectorId::new_invalid(),
 
            branch_annotations: Vec::new(),
 
            branch_markers: Vec::new(),
 
            encountered_ports: VecSet::new(),
 
            solution_combiner: SolutionCombiner::new(),
 
            peers: Vec::new(),
 
@@ -116,7 +120,9 @@ impl Consensus {
 
                    expected_firing: None,
 
                })
 
                .collect(),
 
            cur_marker: BranchMarker::new_invalid(),
 
        });
 
        self.branch_markers.push(BranchId::new_invalid());
 

	
 
        self.highest_connector_id = ctx.id;
 

	
 
@@ -129,10 +135,13 @@ impl Consensus {
 
        // index is the length in `branch_annotations`.
 
        debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize);
 
        let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize];
 
        let new_marker = BranchMarker::new(self.branch_markers.len() as u32);
 
        let new_branch_annotations = BranchAnnotation{
 
            port_mapping: parent_branch_annotations.port_mapping.clone(),
 
            cur_marker: new_marker,
 
        };
 
        self.branch_annotations.push(new_branch_annotations);
 
        self.branch_markers.push(new_branch_id);
 
    }
 

	
 
    /// Notifies the consensus algorithm that a branch has reached the end of
 
@@ -214,7 +223,7 @@ impl Consensus {
 
                        expected_mapping: source_mapping.clone(),
 
                        sending_port: port.port_id,
 
                        target_port: peer_port_id,
 
                        new_mapping: BranchId::new_invalid(),
 
                        new_mapping: BranchMarker::new_invalid(),
 
                    },
 
                    content: DataContent::SilentPortNotification,
 
                }));
 
@@ -223,7 +232,7 @@ impl Consensus {
 

	
 
            target_mapping.push((
 
                channel_id,
 
                port.registered_id.unwrap_or(BranchId::new_invalid())
 
                port.registered_id.unwrap_or(BranchMarker::new_invalid())
 
            ));
 
        }
 

	
 
@@ -290,23 +299,27 @@ impl Consensus {
 
        // Construct data header
 
        // TODO: Handle multiple firings. Right now we just assign the current
 
        //  branch to the `None` value because we know we can only send once.
 
        debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none());
 
        let port_info = ctx.get_port_by_id(source_port_id).unwrap();
 
        let data_header = DataHeader{
 
            expected_mapping: branch.port_mapping.clone(),
 
            sending_port: port_info.self_id,
 
            target_port: port_info.peer_id,
 
            new_mapping: branch_id
 
            new_mapping: branch.cur_marker,
 
        };
 

	
 
        // Update port mapping
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == source_port_id {
 
                mapping.expected_firing = Some(true);
 
                mapping.registered_id = Some(branch_id);
 
                mapping.registered_id = Some(branch.cur_marker);
 
            }
 
        }
 

	
 
        // Update branch marker
 
        let new_marker = BranchMarker::new(self.branch_markers.len() as u32);
 
        branch.cur_marker = new_marker;
 
        self.branch_markers.push(branch_id);
 

	
 
        self.encountered_ports.push(source_port_id);
 

	
 
        return (self.create_sync_header(ctx), data_header);
 
@@ -546,7 +559,7 @@ impl Consensus {
 
#[derive(Debug)]
 
struct MatchedLocalSolution {
 
    final_branch_id: BranchId,
 
    channel_mapping: Vec<(ChannelId, BranchId)>,
 
    channel_mapping: Vec<(ChannelId, BranchMarker)>,
 
    matches: Vec<ComponentMatches>,
 
}
 

	
src/runtime2/inbox.rs
Show inline comments
 
@@ -13,10 +13,32 @@ use super::port::PortIdLocal;
 
#[derive(Debug, Copy, Clone)]
 
pub(crate) struct PortAnnotation {
 
    pub port_id: PortIdLocal,
 
    pub registered_id: Option<BranchId>,
 
    pub registered_id: Option<BranchMarker>,
 
    pub expected_firing: Option<bool>,
 
}
 

	
 
/// Marker for a branch in a port mapping. A marker is, like a branch ID, a
 
/// unique identifier for a branch, but differs in that a branch only has one
 
/// branch ID, but might have multiple associated markers (i.e. one branch
 
/// performing a `put` three times will generate three markers.
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct BranchMarker{
 
    marker: u32,
 
}
 

	
 
impl BranchMarker {
 
    #[inline]
 
    pub(crate) fn new(marker: u32) -> Self {
 
        debug_assert!(marker != 0);
 
        return Self{ marker };
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn new_invalid() -> Self {
 
        return Self{ marker: 0 }
 
    }
 
}
 

	
 
/// The header added by the synchronization algorithm to all.
 
#[derive(Debug, Clone)]
 
pub(crate) struct SyncHeader {
 
@@ -31,7 +53,7 @@ pub(crate) struct DataHeader {
 
    pub expected_mapping: Vec<PortAnnotation>,
 
    pub sending_port: PortIdLocal,
 
    pub target_port: PortIdLocal,
 
    pub new_mapping: BranchId,
 
    pub new_mapping: BranchMarker,
 
}
 

	
 
// TODO: Very much on the fence about this. On one hand I thought making it a
src/runtime2/native.rs
Show inline comments
 
@@ -183,54 +183,46 @@ impl ConnectorApplication {
 
            match &cur_instruction {
 
                ApplicationSyncAction::Put(port_id, content) => {
 
                    let port_id = *port_id;
 
                    let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
 
                    if consistency == Consistency::Valid {
 
                        let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                        let message = Message::Data(DataMessage {
 
                            sync_header,
 
                            data_header,
 
                            content: DataContent::Message(content.clone()),
 
                        });
 
                        comp_ctx.submit_message(message);
 
                        self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                        return ConnectorScheduling::Immediate;
 
                    } else {
 
                        branch.sync_state = SpeculativeState::Inconsistent;
 
                    }
 

	
 
                    let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                    let message = Message::Data(DataMessage {
 
                        sync_header,
 
                        data_header,
 
                        content: DataContent::Message(content.clone()),
 
                    });
 
                    comp_ctx.submit_message(message);
 
                    self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                    return ConnectorScheduling::Immediate;
 
                },
 
                ApplicationSyncAction::Get(port_id) => {
 
                    let port_id = *port_id;
 
                    let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
 
                    if consistency == Consistency::Valid {
 
                        branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                        branch.awaiting_port = port_id;
 
                        self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);
 

	
 
                        let mut any_message_received = false;
 
                        for message in comp_ctx.get_read_data_messages(port_id) {
 
                            if self.consensus.branch_can_receive(branch_id, &message) {
 
                                // This branch can receive the message, so we do the
 
                                // fork-and-receive dance
 
                                let receiving_branch_id = self.tree.fork_branch(branch_id);
 
                                let branch = &mut self.tree[receiving_branch_id];
 
                                debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len());
 
                                self.branch_extra.push(instruction_idx + 1);
 

	
 
                                branch.insert_message(port_id, message.content.as_message().unwrap().clone());
 

	
 
                                self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                                self.consensus.notify_of_received_message(receiving_branch_id, &message);
 
                                self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                                any_message_received = true;
 
                            }
 
                        }
 

	
 
                        if any_message_received {
 
                            return ConnectorScheduling::Immediate;
 
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                    branch.awaiting_port = port_id;
 
                    self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);
 

	
 
                    let mut any_message_received = false;
 
                    for message in comp_ctx.get_read_data_messages(port_id) {
 
                        if self.consensus.branch_can_receive(branch_id, &message) {
 
                            // This branch can receive the message, so we do the
 
                            // fork-and-receive dance
 
                            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
                            let branch = &mut self.tree[receiving_branch_id];
 
                            debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len());
 
                            self.branch_extra.push(instruction_idx + 1);
 

	
 
                            branch.insert_message(port_id, message.content.as_message().unwrap().clone());
 

	
 
                            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                            self.consensus.notify_of_received_message(receiving_branch_id, &message);
 
                            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                            any_message_received = true;
 
                        }
 
                    } else {
 
                        branch.sync_state = SpeculativeState::Inconsistent;
 
                    }
 

	
 
                    if any_message_received {
 
                        return ConnectorScheduling::Immediate;
 
                    }
 
                }
 
            }
src/runtime2/scheduler.rs
Show inline comments
 
@@ -348,11 +348,11 @@ impl Scheduler {
 

	
 
    // TODO: Remove, this is debugging stuff
 
    fn debug(&self, message: &str) {
 
        println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
        // println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
    }
 

	
 
    fn debug_conn(&self, conn: ConnectorId, message: &str) {
 
        println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message);
 
        // println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message);
 
    }
 
}
 

	
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -9,12 +9,10 @@ use crate::common::Id;
 
use crate::protocol::eval::*;
 
use crate::runtime2::native::{ApplicationSyncAction};
 

	
 
//
 

	
 
// Generic testing constants, use when appropriate to simplify stress-testing
 
pub(crate) const NUM_THREADS: u32 = 1;     // number of threads in runtime
 
pub(crate) const NUM_INSTANCES: u32 = 1;   // number of test instances constructed
 
pub(crate) const NUM_LOOPS: u32 = 1;       // number of loops within a single test (not used by all tests)
 
pub(crate) const NUM_THREADS: u32 = 3;     // number of threads in runtime
 
pub(crate) const NUM_INSTANCES: u32 = 7;   // number of test instances constructed
 
pub(crate) const NUM_LOOPS: u32 = 8;       // number of loops within a single test (not used by all tests)
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
0 comments (0 inline, 0 general)