Changeset - a43d61913724
[Not reviewed]
0 9 0
MH - 4 years ago 2021-10-20 09:00:45
contact@maxhenger.nl
prepare for debugging
9 files changed with 225 insertions and 191 deletions:
0 comments (0 inline, 0 general)
src/protocol/mod.rs
Show inline comments
 
mod arena;
 
pub(crate) mod eval;
 
pub(crate) mod input_source;
 
mod parser;
 
#[cfg(test)] mod tests;
 

	
 
pub(crate) mod ast;
 
pub(crate) mod ast_printer;
 

	
 
use std::sync::Mutex;
 

	
 
use crate::collections::{StringPool, StringRef};
 
use crate::common::*;
 
use crate::protocol::ast::*;
 
use crate::protocol::eval::*;
 
use crate::protocol::input_source::*;
 
use crate::protocol::parser::*;
 
use crate::protocol::type_table::*;
 

	
 
/// A protocol description module
 
pub struct Module {
 
    pub(crate) source: InputSource,
 
    pub(crate) root_id: RootId,
 
    pub(crate) name: Option<StringRef<'static>>,
 
}
 
/// Description of a protocol object, used to configure new connectors.
 
#[repr(C)]
 
pub struct ProtocolDescription {
 
    pub(crate) modules: Vec<Module>,
 
    pub(crate) heap: Heap,
 
    pub(crate) types: TypeTable,
 
    pub(crate) pool: Mutex<StringPool>,
 
}
 
#[derive(Debug, Clone)]
 
pub(crate) struct ComponentState {
 
    pub(crate) prompt: Prompt,
 
}
 

	
 
#[allow(dead_code)]
 
pub(crate) enum EvalContext<'a> {
 
    Nonsync(&'a mut NonsyncProtoContext<'a>),
 
    Sync(&'a mut SyncProtoContext<'a>),
 
    None,
 
}
 
//////////////////////////////////////////////
 

	
 
#[derive(Debug)]
 
pub enum ComponentCreationError {
 
    ModuleDoesntExist,
 
    DefinitionDoesntExist,
 
    DefinitionNotComponent,
 
    InvalidNumArguments,
 
    InvalidArgumentType(usize),
 
    UnownedPort,
 
}
 

	
 
impl std::fmt::Debug for ProtocolDescription {
 
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
 
        write!(f, "(An opaque protocol description)")
 
    }
 
}
 
impl ProtocolDescription {
 
    // TODO: Allow for multi-file compilation
 
    pub fn parse(buffer: &[u8]) -> Result<Self, String> {
 
        // TODO: @fixme, keep code compilable, but needs support for multiple
 
        //  input files.
 
        let source = InputSource::new(String::new(), Vec::from(buffer));
 
        let mut parser = Parser::new();
 
        parser.feed(source).expect("failed to feed source");
 
        
 
        if let Err(err) = parser.parse() {
 
            println!("ERROR:\n{}", err);
 
            return Err(format!("{}", err))
 
        }
 

	
 
        debug_assert_eq!(parser.modules.len(), 1, "only supporting one module here for now");
 
        let modules: Vec<Module> = parser.modules.into_iter()
 
            .map(|module| Module{
 
                source: module.source,
 
                root_id: module.root_id,
 
                name: module.name.map(|(_, name)| name)
 
            })
 
            .collect();
 

	
 
        return Ok(ProtocolDescription {
 
            modules,
 
            heap: parser.heap,
 
            types: parser.type_table,
 
            pool: Mutex::new(parser.string_pool),
 
        });
 
    }
 

	
 
    #[deprecated]
 
    pub(crate) fn component_polarities(
 
        &self,
 
        module_name: &[u8],
 
        identifier: &[u8],
 
    ) -> Result<Vec<Polarity>, AddComponentError> {
 
        use AddComponentError::*;
 

	
 
        let module_root = self.lookup_module_root(module_name);
 
        if module_root.is_none() {
 
            return Err(AddComponentError::NoSuchModule);
 
        }
 
        let module_root = module_root.unwrap();
 

	
 
        let root = &self.heap[module_root];
 
        let def = root.get_definition_ident(&self.heap, identifier);
 
        if def.is_none() {
 
            return Err(NoSuchComponent);
 
        }
 

	
 
        let def = &self.heap[def.unwrap()];
 
        if !def.is_component() {
 
            return Err(NoSuchComponent);
 
        }
 

	
 
        for &param in def.parameters().iter() {
 
            let param = &self.heap[param];
 
            let first_element = &param.parser_type.elements[0];
 

	
 
            match first_element.variant {
 
                ParserTypeVariant::Input | ParserTypeVariant::Output => continue,
 
                _ => {
 
                    return Err(NonPortTypeParameters);
 
                }
 
            }
 
        }
 

	
 
        let mut result = Vec::new();
 
        for &param in def.parameters().iter() {
 
            let param = &self.heap[param];
 
            let first_element = &param.parser_type.elements[0];
 

	
 
            if first_element.variant == ParserTypeVariant::Input {
 
                result.push(Polarity::Getter)
 
            } else if first_element.variant == ParserTypeVariant::Output {
 
                result.push(Polarity::Putter)
 
            } else {
 
                unreachable!()
 
            }
 
        }
 
        Ok(result)
 
    }
 

	
 
    // expects port polarities to be correct
 
    #[deprecated]
 
    pub(crate) fn new_component(&self, module_name: &[u8], identifier: &[u8], ports: &[PortId]) -> ComponentState {
 
        let mut args = Vec::new();
 
        for (&x, y) in ports.iter().zip(self.component_polarities(module_name, identifier).unwrap()) {
 
            match y {
 
                Polarity::Getter => args.push(Value::Input(x)),
 
                Polarity::Putter => args.push(Value::Output(x)),
 
            }
 
        }
 

	
 
        let module_root = self.lookup_module_root(module_name).unwrap();
 
        let root = &self.heap[module_root];
 
        let def = root.get_definition_ident(&self.heap, identifier).unwrap();
 
        // TODO: Check for polymorph
 
        ComponentState { prompt: Prompt::new(&self.types, &self.heap, def, 0, ValueGroup::new_stack(args)) }
 
    }
 

	
 
    // TODO: Ofcourse, rename this at some point, perhaps even remove it in its
 
    //  entirety. Find some way to interface with the parameter's types.
 
    pub(crate) fn new_component_v2(
 
        &self, module_name: &[u8], identifier: &[u8], arguments: ValueGroup
 
    ) -> Result<ComponentState, ComponentCreationError> {
 
        // Find the module in which the definition can be found
 
        let module_root = self.lookup_module_root(module_name);
 
        if module_root.is_none() {
 
            return Err(ComponentCreationError::ModuleDoesntExist);
 
        }
 
        let module_root = module_root.unwrap();
 

	
 
        let root = &self.heap[module_root];
 
        let definition_id = root.get_definition_ident(&heap, identifier);
 
        let definition_id = root.get_definition_ident(&self.heap, identifier);
 
        if definition_id.is_none() {
 
            return Err(ComponentCreationError::DefinitionDoesntExist);
 
        }
 
        let definition_id = definition_id.unwrap();
 

	
 
        let definition = &self.heap[definition_id];
 
        if !definition.is_component() {
 
            return Err(ComponentCreationError::DefinitionNotComponent);
 
        }
 

	
 
        // Make sure that the types of the provided value group matches that of
 
        // the expected types.
 
        let definition = definition.as_component();
 
        if !definition.poly_vars.is_empty() {
 
            return Err(ComponentCreationError::DefinitionNotComponent);
 
        }
 

	
 
        // - check number of arguments
 
        let expr_data = self.types.get_procedure_expression_data(&definition_id, 0);
 
        if expr_data.arg_types.len() != arguments.values.len() {
 
            return Err(ComponentCreationError::InvalidNumArguments);
 
        }
 

	
 
        // - for each argument try to make sure the types match
 
        for arg_idx in 0..arguments.values.len() {
 
            let expected_type = &expr_data.arg_types[arg_idx];
 
            let provided_value = &arguments.values[arg_idx];
 
            if !self.verify_same_type(expected_type, 0, &arguments, provided_value) {
 
                return Err(ComponentCreationError::InvalidArgumentType(arg_idx));
 
            }
 
        }
 

	
 
        // By now we're sure that all of the arguments are correct. So create
 
        // the connector.
 
        return Ok(ComponentState{
 
            prompt: Prompt::new(&self.types, &self.heap, def, 0, arguments),
 
            prompt: Prompt::new(&self.types, &self.heap, definition_id, 0, arguments),
 
        });
 
    }
 

	
 
    fn lookup_module_root(&self, module_name: &[u8]) -> Option<RootId> {
 
        for module in self.modules.iter() {
 
            match &module.name {
 
                Some(name) => if name.as_bytes() == module_name {
 
                    return Some(module.root_id);
 
                },
 
                None => if module_name.is_empty() {
 
                    return Some(module.root_id);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn verify_same_type(&self, expected: &ConcreteType, expected_idx: usize, arguments: &ValueGroup, argument: &Value) -> bool {
 
        use ConcreteTypePart as CTP;
 

	
 
        macro_rules! match_variant {
 
            ($value:expr, $variant:expr) => {
 
                if let $variant(_) = $value { true } else { false }
 
            };
 
        }
 

	
 
        match &expected.parts[expected_idx] {
 
            CTP::Void | CTP::Message | CTP::Slice | CTP::Function(_, _) | CTP::Component(_, _) => unreachable!(),
 
            CTP::Bool => match_variant!(argument, Value::Bool),
 
            CTP::UInt8 => match_variant!(argument, Value::UInt8),
 
            CTP::UInt16 => match_variant!(argument, Value::UInt16),
 
            CTP::UInt32 => match_variant!(argument, Value::UInt32),
 
            CTP::UInt64 => match_variant!(argument, Value::UInt64),
 
            CTP::SInt8 => match_variant!(argument, Value::SInt8),
 
            CTP::SInt16 => match_variant!(argument, Value::SInt16),
 
            CTP::SInt32 => match_variant!(argument, Value::SInt32),
 
            CTP::SInt64 => match_variant!(argument, Value::SInt64),
 
            CTP::Character => match_variant!(argument, Value::Char),
 
            CTP::Bool => if let Value::Bool(_) = argument { true } else { false },
 
            CTP::UInt8 => if let Value::UInt8(_) = argument { true } else { false },
 
            CTP::UInt16 => if let Value::UInt16(_) = argument { true } else { false },
 
            CTP::UInt32 => if let Value::UInt32(_) = argument { true } else { false },
 
            CTP::UInt64 => if let Value::UInt64(_) = argument { true } else { false },
 
            CTP::SInt8 => if let Value::SInt8(_) = argument { true } else { false },
 
            CTP::SInt16 => if let Value::SInt16(_) = argument { true } else { false },
 
            CTP::SInt32 => if let Value::SInt32(_) = argument { true } else { false },
 
            CTP::SInt64 => if let Value::SInt64(_) = argument { true } else { false },
 
            CTP::Character => if let Value::Char(_) = argument { true } else { false },
 
            CTP::String => {
 
                // Match outer string type and embedded character types
 
                if let Value::String(heap_pos) = argument {
 
                    for element in &arguments.regions[*heap_pos as usize] {
 
                        if let Value::Char(_) = element {} else {
 
                            return false;
 
                        }
 
                    }
 
                } else {
 
                    return false;
 
                }
 

	
 
                return true;
 
            },
 
            CTP::Array => {
 
                if let Value::Array(heap_pos) = argument {
 
                    let heap_pos = *heap_pos;
 
                    for element in &arguments.regions[heap_pos as usize] {
 
                        if !self.verify_same_type(expected, expected_idx + 1, arguments, element) {
 
                            return false;
 
                        }
 
                    }
 
                    return true;
 
                } else {
 
                    return false;
 
                }
 
            },
 
            CTP::Input => match_variant!(argument, Value::Input),
 
            CTP::Output => match_variant!(argument, Value::Output),
 
            CTP::Input => if let Value::Input(_) = argument { true } else { false },
 
            CTP::Output => if let Value::Output(_) = argument { true } else { false },
 
            CTP::Instance(_definition_id, _num_embedded) => {
 
                todo!("implement full type checking on user-supplied arguments");
 
                return false;
 
            },
 
        }
 
    }
 
}
 

	
 
// TODO: @temp Should just become a concrete thing that is passed in
 
pub trait RunContext {
 
    fn did_put(&mut self, port: PortId) -> bool;
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup>; // None if still waiting on message
 
    fn fires(&mut self, port: PortId) -> Option<Value>; // None if not yet branched
 
    fn get_channel(&mut self) -> Option<(Value, Value)>; // None if not yet prepared
 
}
 

	
 
#[derive(Debug)]
 
pub enum RunResult {
 
    // Can only occur outside sync blocks
 
    ComponentTerminated, // component has exited its procedure
 
    ComponentAtSyncStart,
 
    NewComponent(DefinitionId, i32, ValueGroup), // should also be possible inside sync
 
    NewChannel, // should also be possible inside sync
 
    // Can only occur inside sync blocks
 
    BranchInconsistent, // branch has inconsistent behaviour
 
    BranchMissingPortState(PortId), // branch doesn't know about port firing
 
    BranchMissingPortValue(PortId), // branch hasn't received message on input port yet
 
    BranchAtSyncEnd,
 
    BranchPut(PortId, ValueGroup),
 
}
 

	
 
impl ComponentState {
 
    pub(crate) fn run(&mut self, ctx: &mut impl RunContext, pd: &ProtocolDescription) -> RunResult {
 
        use EvalContinuation as EC;
 
        use RunResult as RR;
 

	
 
        loop {
 
            let step_result = self.prompt.step(&pd.types, &pd.heap, &pd.modules, ctx);
 
            match step_result {
 
                Err(reason) => {
 
                    // TODO: @temp
 
                    println!("Evaluation error:\n{}", reason);
 
                    todo!("proper error handling/bubbling up");
 
                },
 
                Ok(continuation) => match continuation {
 
                    // TODO: Probably want to remove this translation
 
                    EC::Stepping => continue,
 
                    EC::Inconsistent => return RR::BranchInconsistent,
 
                    EC::Terminal => return RR::ComponentTerminated,
 
                    EC::SyncBlockStart => return RR::ComponentAtSyncStart,
 
                    EC::SyncBlockEnd => return RR::BranchAtSyncEnd,
 
                    EC::NewComponent(definition_id, monomorph_idx, args) =>
 
                        return RR::NewComponent(definition_id, monomorph_idx, args),
 
                    EC::NewChannel =>
 
                        return RR::NewChannel,
 
                    EC::BlockFires(port_id) => return RR::BranchMissingPortState(port_id),
 
                    EC::BlockGet(port_id) => return RR::BranchMissingPortValue(port_id),
 
                    EC::Put(port_id, value) => {
 
                        let value_group = ValueGroup::from_store(&self.prompt.store, &[value]);
 
                        return RR::BranchPut(port_id, value_group);
 
                    },
 
                }
 
            }
 
        }
 
    }
 
}
 

	
 
// TODO: @remove the old stuff
 
impl ComponentState {
 
    pub(crate) fn nonsync_run<'a: 'b, 'b>(
 
        &'a mut self,
 
        context: &'b mut NonsyncProtoContext<'b>,
 
        pd: &'a ProtocolDescription,
 
    ) -> NonsyncBlocker {
 
        let mut context = EvalContext::Nonsync(context);
 
        loop {
 
            let result = self.prompt.step(&pd.types, &pd.heap, &pd.modules, &mut context);
 
            match result {
 
                Err(err) => {
 
                    println!("Evaluation error:\n{}", err);
 
                    panic!("proper error handling when component fails");
 
                },
 
                Ok(cont) => match cont {
 
                    EvalContinuation::Stepping => continue,
 
                    EvalContinuation::Inconsistent => return NonsyncBlocker::Inconsistent,
 
                    EvalContinuation::Terminal => return NonsyncBlocker::ComponentExit,
 
                    EvalContinuation::SyncBlockStart => return NonsyncBlocker::SyncBlockStart,
 
                    // Not possible to end sync block if never entered one
 
                    EvalContinuation::SyncBlockEnd => unreachable!(),
 
                    EvalContinuation::NewComponent(definition_id, monomorph_idx, args) => {
 
                        // Look up definition (TODO for now, assume it is a definition)
 
                        let mut moved_ports = HashSet::new();
 
                        for arg in args.values.iter() {
 
                            match arg {
 
                                Value::Output(port) => {
 
                                    moved_ports.insert(*port);
 
                                }
 
                                Value::Input(port) => {
 
                                    moved_ports.insert(*port);
 
                                }
 
                                _ => {}
 
                            }
 
                        }
 
                        for region in args.regions.iter() {
 
                            for arg in region {
 
                                match arg {
 
                                    Value::Output(port) => { moved_ports.insert(*port); },
 
                                    Value::Input(port) => { moved_ports.insert(*port); },
 
                                    _ => {},
 
                                }
 
                            }
 
                        }
 
                        let init_state = ComponentState { prompt: Prompt::new(&pd.types, &pd.heap, definition_id, monomorph_idx, args) };
 
                        context.new_component(moved_ports, init_state);
 
                        // Continue stepping
 
                        continue;
 
                    },
 
                    EvalContinuation::NewChannel => {
 
                        // Because of the way we emulate the old context for now, we can safely
 
                        // assume that this will never happen. The old context thingamajig always
 
                        // creates a channel, it never bubbles a "need to create a channel" message
 
                        // to the runtime
 
                        unreachable!();
 
                    },
 
                    // Outside synchronous blocks, no fires/get/put happens
 
                    EvalContinuation::BlockFires(_) => unreachable!(),
 
                    EvalContinuation::BlockGet(_) => unreachable!(),
 
                    EvalContinuation::Put(_, _) => unreachable!(),
 
                },
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn sync_run<'a: 'b, 'b>(
 
        &'a mut self,
 
        context: &'b mut SyncProtoContext<'b>,
 
        pd: &'a ProtocolDescription,
 
    ) -> SyncBlocker {
 
        let mut context = EvalContext::Sync(context);
 
        loop {
 
            let result = self.prompt.step(&pd.types, &pd.heap, &pd.modules, &mut context);
 
            match result {
 
                Err(err) => {
 
                    println!("Evaluation error:\n{}", err);
 
                    panic!("proper error handling when component fails");
 
                },
 
                Ok(cont) => match cont {
 
                    EvalContinuation::Stepping => continue,
 
                    EvalContinuation::Inconsistent => return SyncBlocker::Inconsistent,
 
                    // First need to exit synchronous block before definition may end
 
                    EvalContinuation::Terminal => unreachable!(),
 
                    // No nested synchronous blocks
 
                    EvalContinuation::SyncBlockStart => unreachable!(),
 
                    EvalContinuation::SyncBlockEnd => return SyncBlocker::SyncBlockEnd,
 
                    // Not possible to create component in sync block
 
                    EvalContinuation::NewComponent(_, _, _) => unreachable!(),
 
                    EvalContinuation::NewChannel => unreachable!(),
 
                    EvalContinuation::BlockFires(port) => {
 
                        return SyncBlocker::CouldntCheckFiring(port);
 
                    },
 
                    EvalContinuation::BlockGet(port) => {
 
                        return SyncBlocker::CouldntReadMsg(port);
 
                    },
 
                    EvalContinuation::Put(port, message) => {
 
                        let payload;
 
                        match message {
 
                            Value::Null => {
 
                                return SyncBlocker::Inconsistent;
 
                            },
 
                            Value::Message(heap_pos) => {
 
                                // Create a copy of the payload
 
                                let values = &self.prompt.store.heap_regions[heap_pos as usize].values;
 
                                let mut bytes = Vec::with_capacity(values.len());
 
                                for value in values {
 
                                    bytes.push(value.as_uint8());
 
                                }
 
                                payload = Payload(Arc::new(bytes));
 
                            }
 
                            _ => unreachable!(),
 
                        }
 
                        return SyncBlocker::PutMsg(port, payload);
 
                    }
 
                },
 
            }
 
        }
 
    }
 
}
 

	
 
impl RunContext for EvalContext<'_> {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        match self {
 
            EvalContext::None => unreachable!(),
src/runtime2/connector.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::ops::Deref;
 
use std::sync::atomic::AtomicBool;
 

	
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::global_store::ConnectorKey;
 
use crate::runtime2::inbox::{MessageContents, OutgoingMessage, SolutionMessage};
 
use crate::runtime2::inbox::{MessageContents, SolutionMessage};
 
use crate::runtime2::native::Connector;
 
use crate::runtime2::port::PortKind;
 
use crate::runtime2::port::{Port, PortKind};
 
use crate::runtime2::scheduler::ConnectorCtx;
 
use super::global_store::ConnectorId;
 
use super::inbox::{
 
    PrivateInbox, PublicInbox, OutgoingDataMessage, DataMessage, SyncMessage,
 
    PrivateInbox, PublicInbox, DataMessage, SyncMessage,
 
    SyncBranchConstraint, SyncConnectorSolution
 
};
 
use super::port::PortIdLocal;
 

	
 
/// Represents the identifier of a branch (the index within its container). An
 
/// ID of `0` generally means "no branch" (e.g. no parent, or a port did not
 
/// yet receive anything from any branch).
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct BranchId {
 
    pub index: u32,
 
}
 

	
 
impl BranchId {
 
    fn new_invalid() -> Self {
 
        Self{ index: 0 }
 
    }
 

	
 
    fn new(index: u32) -> Self {
 
        debug_assert!(index != 0);
 
        Self{ index }
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.index != 0;
 
    }
 
}
 

	
 
#[derive(PartialEq, Eq)]
 
#[derive(Debug, PartialEq, Eq)]
 
pub(crate) enum SpeculativeState {
 
    // Non-synchronous variants
 
    RunningNonSync,         // regular execution of code
 
    Error,                  // encountered a runtime error
 
    Finished,               // finished executing connector's code
 
    // Synchronous variants
 
    RunningInSync,          // running within a sync block
 
    HaltedAtBranchPoint,    // at a branching point (at a `get` call)
 
    ReachedSyncEnd,         // reached end of sync block, branch represents a local solution
 
    Inconsistent,           // branch can never represent a local solution, so halted
 
}
 

	
 
pub(crate) struct Branch {
 
    index: BranchId,
 
    parent_index: BranchId,
 
    // Code execution state
 
    code_state: ComponentState,
 
    sync_state: SpeculativeState,
 
    next_branch_in_queue: Option<u32>,
 
    // Message/port state
 
    received: HashMap<PortIdLocal, DataMessage>, // TODO: @temporary, remove together with fires()
 
    ports_delta: Vec<PortOwnershipDelta>,
 
}
 

	
 
impl Branch {
 
    /// Constructs a non-sync branch. It is assumed that the code is at the
 
    /// first instruction
 
    pub(crate) fn new_initial_branch(component_state: ComponentState) -> Self {
 
        Branch{
 
            index: BranchId::new_invalid(),
 
            parent_index: BranchId::new_invalid(),
 
            code_state: component_state,
 
            sync_state: SpeculativeState::RunningNonSync,
 
            next_branch_in_queue: None,
 
            received: HashMap::new(),
 
            ports_delta: Vec::new(),
 
        }
 
    }
 

	
 
    /// Constructs a sync branch. The provided branch is assumed to be the
 
    /// parent of the new branch within the execution tree.
 
    fn new_sync_branching_from(new_index: u32, parent_branch: &Branch) -> Self {
 
        debug_assert!(
 
            (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) ||
 
            (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        );
 

	
 
        Branch{
 
            index: BranchId::new(new_index),
 
            parent_index: parent_branch.index,
 
            code_state: parent_branch.code_state.clone(),
 
            sync_state: SpeculativeState::RunningInSync,
 
            next_branch_in_queue: None,
 
            received: parent_branch.received.clone(),
 
            ports_delta: parent_branch.ports_delta.clone(),
 
        }
 
    }
 

	
 
    fn commit_to_sync(&mut self) {
 
        self.index = BranchId::new(0);
 
        self.parent_index = BranchId::new_invalid();
 
        self.sync_state = SpeculativeState::RunningNonSync;
 
        self.next_branch_in_queue = None;
 
        self.received.clear();
 
        self.ports_delta.clear();
 
    }
 
}
 

	
 
#[derive(Clone)]
 
struct PortAssignment {
 
    is_assigned: bool,
 
    last_registered_branch_id: BranchId, // invalid branch ID implies not assigned yet
 
    num_times_fired: u32,
 
}
 

	
 
impl PortAssignment {
 
    fn new_unassigned() -> Self {
 
        Self{
 
            is_assigned: false,
 
            last_registered_branch_id: BranchId::new_invalid(),
 
            num_times_fired: 0,
 
        }
 
    }
 

	
 
    #[inline]
 
    fn mark_speculative(&mut self, num_times_fired: u32) {
 
        debug_assert!(!self.last_registered_branch_id.is_valid());
 
        self.is_assigned = true;
 
        self.num_times_fired = num_times_fired;
 
    }
 

	
 
    #[inline]
 
    fn mark_definitive(&mut self, branch_id: BranchId, num_times_fired: u32) {
 
        self.is_assigned = true;
 
        self.last_registered_branch_id = branch_id;
 
        self.num_times_fired = num_times_fired;
 
    }
 
}
 

	
 
#[derive(Clone, Eq)]
 
#[derive(Clone)]
 
struct PortOwnershipDelta {
 
    acquired: bool, // if false, then released ownership
 
    port_id: PortIdLocal,
 
}
 

	
 
enum PortOwnershipError {
 
    UsedInInteraction(PortIdLocal),
 
    AlreadyGivenAway(PortIdLocal)
 
}
 

	
 
/// Contains a description of the port mapping during a particular sync session.
 
/// TODO: Extend documentation
 
pub(crate) struct ConnectorPorts {
 
    // Essentially a mapping from `port_index` to `port_id`.
 
    pub owned_ports: Vec<PortIdLocal>,
 
    // Contains P*B entries, where P is the number of ports and B is the number
 
    // of branches. One can find the appropriate mapping of port p at branch b
 
    // at linear index `b*P+p`.
 
    pub port_mapping: Vec<PortAssignment>
 
}
 

	
 
impl ConnectorPorts {
 
    /// Constructs the initial ports object. Assumes the presence of the
 
    /// non-sync branch at index 0. Will initialize all entries for the non-sync
 
    /// branch.
 
    fn new(owned_ports: Vec<PortIdLocal>) -> Self {
 
        let num_ports = owned_ports.len();
 
        let mut port_mapping = Vec::with_capacity(num_ports);
 
        for _ in 0..num_ports {
 
            port_mapping.push(PortAssignment::new_unassigned());
 
        }
 

	
 
        Self{ owned_ports, port_mapping }
 
    }
 

	
 
    /// Prepares the port mapping for a new branch. Assumes that there is no
 
    /// intermediate branch index that we have skipped.
 
    fn prepare_sync_branch(&mut self, parent_branch_idx: u32, new_branch_idx: u32) {
 
        let num_ports = self.owned_ports.len();
 
        let parent_base_idx = parent_branch_idx as usize * num_ports;
 
        let new_base_idx = new_branch_idx as usize * num_ports;
 

	
 
        debug_assert!(parent_branch_idx < new_branch_idx);
 
        debug_assert!(new_base_idx == self.port_mapping.len());
 

	
 
        self.port_mapping.reserve(num_ports);
 
        for offset in 0..num_ports {
 
            let parent_port = &self.port_mapping[parent_base_idx + offset];
 
            self.port_mapping.push(parent_port.clone());
 
        }
 
    }
 

	
 
    /// Adds a new port. Caller must make sure that the connector is not in the
 
    /// sync phase.
 
    fn add_port(&mut self, port_id: PortIdLocal) {
 
        debug_assert!(self.port_mapping.len() == self.owned_ports.len());
 
        debug_assert!(!self.owned_ports.contains(&port_id));
 
        self.owned_ports.push(port_id);
 
        self.port_mapping.push(PortAssignment::new_unassigned());
 
    }
 

	
 
    /// Commits to a particular branch. Essentially just removes the port
 
    /// mapping information generated during the sync phase.
 
    fn commit_to_sync(&mut self) {
 
        self.port_mapping.truncate(self.owned_ports.len());
 
        debug_assert!(self.port_mapping.iter().all(|v| {
 
            !v.is_assigned && !v.last_registered_branch_id.is_valid()
 
        }));
 
    }
 

	
 
    /// Removes a particular port from the connector. May only be done if the
 
    /// connector is in non-sync mode
 
    fn remove_port(&mut self, port_id: PortIdLocal) {
 
        debug_assert!(self.port_mapping.len() == self.owned_ports.len()); // in non-sync mode
 
        let port_index = self.get_port_index(port_id).unwrap();
 
        self.owned_ports.remove(port_index);
 
        self.port_mapping.remove(port_index);
 
    }
 

	
 
    /// Retrieves the index associated with a port id. Note that the port might
 
    /// not exist (yet) if a speculative branch has just received the port.
 
    /// TODO: But then again, one cannot use that port, right?
 
    #[inline]
 
    fn get_port_index(&self, port_id: PortIdLocal) -> Option<usize> {
 
        for (idx, port) in self.owned_ports.iter().enumerate() {
 
            if port == port_id {
 
            if *port == port_id {
 
                return Some(idx)
 
            }
 
        }
 

	
 
        return None
 
    }
 

	
 
    /// Retrieves the ID associated with the port at the provided index
 
    #[inline]
 
    fn get_port_id(&self, port_index: usize) -> PortIdLocal {
 
        return self.owned_ports[port_index];
 
    }
 

	
 
    #[inline]
 
    fn get_port(&self, branch_idx: u32, port_idx: usize) -> &PortAssignment {
 
        let mapped_idx = self.mapped_index(branch_idx, port_idx);
 
        return &self.port_mapping[mapped_idx];
 
    }
 

	
 
    #[inline]
 
    fn get_port_mut(&mut self, branch_idx: u32, port_idx: usize) -> &mut PortAssignment {
 
        let mapped_idx = self.mapped_index(branch_idx, port_idx);
 
        return &mut self.port_mapping(mapped_idx);
 
        return &mut self.port_mapping[mapped_idx];
 
    }
 

	
 
    #[inline]
 
    fn num_ports(&self) -> usize {
 
        return self.owned_ports.len();
 
    }
 

	
 

	
 
    // Function for internal use: retrieve index in flattened port mapping array
 
    // based on branch/port index.
 
    #[inline]
 
    fn mapped_index(&self, branch_idx: u32, port_idx: usize) -> usize {
 
        let branch_idx = branch_idx as usize;
 
        let num_ports = self.owned_ports.len();
 

	
 
        debug_assert!(port_idx < num_ports);
 
        debug_assert!((branch_idx + 1) * num_ports <= self.port_mapping.len());
 

	
 
        return branch_idx * num_ports + port_idx;
 
    }
 
}
 

	
 
struct BranchQueue {
 
    first: u32,
 
    last: u32,
 
}
 

	
 
impl BranchQueue {
 
    #[inline]
 
    fn new() -> Self {
 
        Self{ first: 0, last: 0 }
 
    }
 

	
 
    #[inline]
 
    fn is_empty(&self) -> bool {
 
        debug_assert!((self.first == 0) == (self.last == 0));
 
        return self.first == 0;
 
    }
 

	
 
    #[inline]
 
    fn clear(&mut self) {
 
        self.first = 0;
 
        self.last = 0;
 
    }
 
}
 

	
 
/// Public fields of the connector that can be freely shared between multiple
 
/// threads.
 
pub(crate) struct ConnectorPublic {
 
    pub inbox: PublicInbox,
 
    pub sleeping: AtomicBool,
 
}
 

	
 
impl ConnectorPublic {
 
    pub fn new() -> Self {
 
        ConnectorPublic{
 
            inbox: PublicInbox::new(),
 
            sleeping: AtomicBool::new(false),
 
        }
 
    }
 
}
 

	
 
// TODO: Maybe prevent false sharing by aligning `public` to next cache line.
 
// TODO: Do this outside of the connector, create a wrapping struct
 
pub(crate) struct ConnectorPDL {
 
    // State and properties of connector itself
 
    in_sync: bool,
 
    // Branch management
 
    branches: Vec<Branch>, // first branch is always non-speculative one
 
    sync_active: BranchQueue,
 
    sync_pending_get: BranchQueue,
 
    sync_finished: BranchQueue,
 
    sync_finished_last_handled: u32,
 
    sync_finished_last_handled: u32, // TODO: Change to BranchId?
 
    cur_round: u32,
 
    // Port/message management
 
    pub committed_to: Option<(ConnectorId, u64)>,
 
    pub inbox: PrivateInbox,
 
    pub ports: ConnectorPorts,
 
}
 

	
 
struct TempCtx {}
 
impl RunContext for TempCtx {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        todo!()
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        todo!()
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        todo!()
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        todo!()
 
    }
 
}
 

	
 
impl Connector for ConnectorPDL {
 
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        use MessageContents as MC;
 

	
 
        match message {
 
            MC::Data(message) => self.handle_data_message(message),
 
            MC::Sync(message) => self.handle_sync_message(message, ctx, delta_state),
 
            MC::RequestCommit(message) => self.handle_request_commit_message(message, ctx, delta_state),
 
            MC::ConfirmCommit(message) => self.handle_confirm_commit_message(message, ctx, delta_state),
 
            MC::Control(_) | MC::Ping => {},
 
        }
 
    }
 

	
 
    fn run(&mut self, pd: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        if self.in_sync {
 
            let scheduling = self.run_in_speculative_mode(pd, ctx, results);
 
            let scheduling = self.run_in_speculative_mode(pd, ctx, delta_state);
 

	
 
            // When in speculative mode we might have generated new sync
 
            // solutions, we need to turn them into proposed solutions here.
 
            if self.sync_finished_last_handled != self.sync_finished.last {
 
                // Retrieve first element in queue
 
                let mut next_id;
 
                if self.sync_finished_last_handled == 0 {
 
                    next_id = self.sync_finished.first;
 
                } else {
 
                    let last_handled = &self.branches[self.sync_finished_last_handled as usize];
 
                    debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue"
 
                    next_id = last_handled.next_branch_in_queue.unwrap();
 
                }
 

	
 
                loop {
 
                    let branch_id = BranchId::new(next_id);
 
                    let branch = &self.branches[next_id as usize];
 
                    let branch_next = branch.next_branch_in_queue;
 

	
 
                    // Turn local solution into a message and send it along
 
                    // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed
 
                    let solution_message = self.generate_initial_solution_for_branch(branch_id, ctx);
 
                    if let Some(valid_solution) = solution_message {
 
                        self.submit_sync_solution(valid_solution, ctx, results);
 
                        self.submit_sync_solution(valid_solution, ctx, delta_state);
 
                    } else {
 
                        // Branch is actually invalid, but we only just figured
 
                        // it out. We need to mark it as invalid to prevent
 
                        // future use
 
                        Self::remove_branch_from_queue(&mut self.branches, &mut self.sync_finished, branch_id);
 
                        if branch_id == self.sync_finished_last_handled {
 
                        if branch_id.index == self.sync_finished_last_handled {
 
                            self.sync_finished_last_handled = self.sync_finished.last;
 
                        }
 

	
 
                        let branch = &mut self.branches[next_id as usize];
 
                        branch.sync_state = SpeculativeState::Inconsistent;
 
                    }
 

	
 
                    match branch_next {
 
                        Some(id) => next_id = id,
 
                        None => break,
 
                    }
 
                }
 

	
 
                self.sync_finished_last_handled = next_id;
 
            }
 

	
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(pd, ctx, results);
 
            let scheduling = self.run_in_deterministic_mode(pd, ctx, delta_state);
 
            return scheduling;
 
        }
 
    }
 
}
 

	
 
impl ConnectorPDL {
 
    /// Constructs a representation of a connector. The assumption is that the
 
    /// initial branch is at the first instruction of the connector's code,
 
    /// hence is in a non-sync state.
 
    pub fn new(initial_branch: Branch, owned_ports: Vec<PortIdLocal>) -> Self {
 
        Self{
 
            in_sync: false,
 
            branches: vec![initial_branch],
 
            sync_active: BranchQueue::new(),
 
            sync_pending_get: BranchQueue::new(),
 
            sync_finished: BranchQueue::new(),
 
            sync_finished_last_handled: 0, // none at all
 
            cur_round: 0,
 
            committed_to: None,
 
            inbox: PrivateInbox::new(),
 
            ports: ConnectorPorts::new(owned_ports),
 
        }
 
    }
 

	
 
    pub fn is_in_sync_mode(&self) -> bool {
 
        return self.in_sync;
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling connector messages
 
    // -------------------------------------------------------------------------
 

	
 
    #[inline]
 
    pub fn handle_data_message(&mut self, message: DataMessage) {
 
        self.inbox.insert_message(message);
 
    }
 

	
 
    /// Accepts a synchronous message and combines it with the locally stored
 
    /// solution(s). Then queue new `Sync`/`Solution` messages when appropriate.
 
    pub fn handle_sync_message(&mut self, message: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) {
 
        debug_assert!(!message.to_visit.contains(&ctx.id)); // own ID already removed
 
        debug_assert!(message.constraints.iter().any(|v| v.connector_id == self.id)); // we have constraints
 
        debug_assert!(message.constraints.iter().any(|v| v.connector_id == ctx.id)); // we have constraints
 

	
 
        // TODO: Optimize, use some kind of temp workspace vector
 
        let mut execution_path_branch_ids = Vec::new();
 

	
 
        if self.sync_finished_last_handled != 0 {
 
            // We have some solutions to match against
 
            let constraints_index = message.constraints
 
                .iter()
 
                .position(|v| v.connector_id == ctx.id)
 
                .unwrap();
 
            let constraints = &message.constraints[constraints_index].constraints;
 
            debug_assert!(!constraints.is_empty());
 

	
 
            // Note that we only iterate over the solutions we've already
 
            // handled ourselves, not necessarily
 
            let mut branch_index = self.sync_finished.first;
 
            'branch_loop: loop {
 
                // Load solution branch
 
                let branch = &self.branches[branch_index as usize];
 
                execution_path_branch_ids.clear();
 
                self.branch_ids_of_execution_path(BranchId::new(branch_index), &mut execution_path_branch_ids);
 

	
 
                // Check if the branch matches all of the applied constraints
 
                for constraint in constraints {
 
                    match constraint {
 
                        SyncBranchConstraint::SilentPort(silent_port_id) => {
 
                            let port_index = self.ports.get_port_index(*silent_port_id);
 
                            if port_index.is_none() {
 
                                // Nefarious peer
 
                                continue 'branch_loop;
 
                            }
 
                            let port_index = port_index.unwrap();
 

	
 
                            let mapping = self.ports.get_port(branch_index, port_index);
 
                            debug_assert!(mapping.is_assigned);
 

	
 
                            if mapping.num_times_fired != 0 {
 
                                // Not silent, constraint not satisfied
 
                                continue 'branch_loop;
 
                            }
 
                        },
 
                        SyncBranchConstraint::BranchNumber(expected_branch_id) => {
 
                            if !execution_path_branch_ids.contains(expected_branch_id) {
 
                                // Not the expected execution path, constraint not satisfied
 
                                continue 'branch_loop;
 
                            }
 
                        },
 
                        SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => {
 
                            let port_index = self.ports.get_port_index(*port_id);
 
                            if port_index.is_none() {
 
                                // Nefarious peer
 
                                continue 'branch_loop;
 
                            }
 
                            let port_index = port_index.unwrap();
 

	
 
                            let mapping = self.ports.get_port(branch_index, port_index);
 
                            if mapping.last_registered_branch_id != expected_branch_id {
 
                            if mapping.last_registered_branch_id != *expected_branch_id {
 
                                // Not the expected interaction on this port, constraint not satisfied
 
                                continue 'branch_loop;
 
                            }
 
                        },
 
                    }
 
                }
 

	
 
                // If here, then all of the external constraints were satisfied
 
                // for the current branch. But the branch itself also imposes
 
                // constraints. So while building up the new solution, make sure
 
                // that those are satisfied as well.
 
                // TODO: Code below can probably be merged with initial solution
 
                //  generation.
 

	
 
                // - clone old solution so we can add to it
 
                let mut new_solution = message.clone();
 

	
 
                // - determine the initial port mapping
 
                let num_ports = self.ports.num_ports();
 
                let mut new_solution_mapping = Vec::with_capacity(num_ports);
 
                for port_index in 0..self.ports.num_ports() {
 
                    let port_id = self.ports.get_port_id(port_index);
 
                    let mapping = self.ports.get_port(branch_index, port_index);
 
                    new_solution_mapping.push((port_id, mapping.last_registered_branch_id));
 
                }
 

	
 
                // - replace constraints with a local solution
 
                new_solution.constraints.remove(constraints_index);
 
                new_solution.local_solutions.push(SyncConnectorSolution{
 
                    connector_id: ctx.id,
 
                    terminating_branch_id: BranchId::new(branch_index),
 
                    execution_branch_ids: execution_path_branch_ids.clone(),
 
                    final_port_mapping: new_solution_mapping,
 
                });
 

	
 
                // - do a second pass on the ports to generate and add the
 
                //   constraints that should be applied to other connectors
 
                for port_index in 0..self.ports.num_ports() {
 
                    let port_id = self.ports.get_port_id(port_index);
 

	
 
                    let (peer_connector_id, peer_port_id, peer_is_getter) = {
 
                        let port = ctx.get_port(port_id);
 
                        (port.peer_connector, port.peer_id, port.kind == PortKind::Putter)
 
                    };
 

	
 
                    let mapping = self.ports.get_port(branch_index, port_index);
 
                    let constraint = if mapping.num_times_fired == 0 {
 
                        SyncBranchConstraint::SilentPort(peer_port_id)
 
                    } else {
 
                        if peer_is_getter {
 
                            SyncBranchConstraint::PortMapping(peer_port_id, mapping.last_registered_branch_id)
 
                        } else {
 
                            SyncBranchConstraint::BranchNumber(mapping.last_registered_branch_id)
 
                        }
 
                    };
 

	
 
                    match new_solution.add_or_check_constraint(peer_connector_id, constraint) {
 
                        None => continue 'branch_loop,
 
                        Some(false) => continue 'branch_loop,
 
                        Some(true) => {},
 
                        Err(_) => continue 'branch_loop,
 
                        Ok(false) => continue 'branch_loop,
 
                        Ok(true) => {},
 
                    }
 
                }
 

	
 
                // If here, then the newly generated solution is completely
 
                // compatible.
 
                self.submit_sync_solution(new_solution, ctx, results);
 

	
 
                // Consider the next branch
 
                if branch_index == self.sync_finished_last_handled {
 
                    // At the end of the previously handled solutions
 
                    break;
 
                }
 

	
 
                debug_assert!(branch.next_branch_in_queue.is_some()); // because we cannot be at the end of the queue
 
                branch_index = branch.next_branch_in_queue.unwrap();
 
            }
 
        }
 
    }
 

	
 
    fn handle_request_commit_message(&mut self, mut message: SolutionMessage, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        let should_propagate_message = match &self.committed_to {
 
            Some((previous_origin, previous_comparison)) => {
 
                // Already committed to something. So will commit to this if it
 
                // takes precedence over the current solution
 
                message.comparison_number > *previous_comparison ||
 
                    (message.comparison_number == *previous_comparison && message.connector_origin.0 > previous_comparison.0)
 
                    (message.comparison_number == *previous_comparison && message.connector_origin.0 > previous_origin.0)
 
            },
 
            None => {
 
                // Not yet committed to a solution, so commit to this one
 
                true
 
            }
 
        };
 

	
 
        if should_propagate_message {
 
            self.committed_to = Some((message.connector_origin, message.comparison_number));
 

	
 
            if message.to_visit.is_empty() {
 
                // Visited all of the connectors, so every connector can now
 
                // apply the solution
 
                // TODO: Use temporary workspace
 
                let mut to_visit = Vec::with_capacity(message.local_solutions.len() - 1);
 
                for (connector_id, _) in &message.local_solutions {
 
                    if *connector_id != ctx.id {
 
                        to_visit.push(*connector_id);
 
                    }
 
                }
 

	
 
                message.to_visit = to_visit;
 
                self.handle_confirm_commit_message(message.clone(), ctx, delta_state);
 
                delta_state.outbox.push(MessageContents::ConfirmCommit(message));
 
            } else {
 
                // Not yet visited all of the connectors
 
                delta_state.outbox.push(MessageContents::RequestCommit(message));
 
            }
 
        }
 
    }
 

	
 
    fn handle_confirm_commit_message(&mut self, message: SolutionMessage, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        // Make sure this is the message we actually committed to. As long as
 
        // we're running on a single machine this is fine.
 
        // TODO: Take care of nefarious peers
 
        let (expected_connector_id, expected_comparison_number) =
 
            self.committed_to.unwrap();
 
        assert_eq!(message.connector_origin, expected_connector_id);
 
        assert_eq!(message.comparison_number, expected_comparison_number);
 

	
 
        // Find the branch we're supposed to commit to
 
        let (_, branch_id) = message.local_solutions
 
            .iter()
 
            .find(|(id, _)| *id == ctx.id)
 
            .unwrap();
 
        let branch_id = *branch_id;
 

	
 
        // Commit to the branch. That is: move the solution branch to the first
 
        // of the connector's branches
 
        self.in_sync = false;
 
        self.branches.swap(0, branch_id.index as usize);
 
        self.branches.truncate(1); // TODO: Or drain and do not deallocate?
 
        let solution = &mut self.branches[0];
 

	
 
        // Clear all of the other sync-related variables
 
        self.sync_active.clear();
 
        self.sync_pending_get.clear();
 
        self.sync_finished.clear();
 
        self.sync_finished_last_handled = 0;
 
        self.cur_round += 1;
 

	
 
        self.committed_to = None;
 
        self.inbox.clear();
 
        self.ports.commit_to_sync();
 

	
 
        // Add/remove any of the ports we lost during the sync phase
 
        for port_delta in &solution.ports_delta {
 
            if port_delta.acquired {
 
                self.ports.add_port(port_delta.port_id);
 
            } else {
 
                self.ports.remove_port(port_delta.port_id);
 
            }
 
        }
 
        solution.commit_to_sync();
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Executing connector code
 
    // -------------------------------------------------------------------------
 

	
 
    /// Runs the connector in synchronous mode. Potential changes to the global
 
    /// system's state are added to the `RunDeltaState` object by the connector,
 
    /// where it is the caller's responsibility to immediately take care of
 
    /// those changes. The return value indicates when (and if) the connector
 
    /// needs to be scheduled again.
 
    pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(self.in_sync);
 
        debug_assert!(!self.sync_active.is_empty());
 

	
 
        let branch = Self::pop_branch_from_queue(&mut self.branches, &mut self.sync_active);
 

	
 
        // Run the branch to the next blocking point
 
        let mut run_context = TempCtx{};
 
        let run_result = branch.code_state.run(&mut run_context, pd);
 

	
 
        // Match statement contains `return` statements only if the particular
 
        // run result behind handled requires an immediate re-run of the
 
        // connector.
 
        match run_result {
 
            RunResult::BranchInconsistent => {
 
                // Speculative branch became inconsistent
 
                branch.sync_state = SpeculativeState::Inconsistent;
 
            },
 
            RunResult::BranchMissingPortState(port_id) => {
 
                // Branch called `fires()` on a port that does not yet have an
 
                // assigned speculative value. So we need to create those
 
                // branches
 
                let local_port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let local_port_index = self.ports.get_port_index(local_port_id).unwrap();
 

	
 
                debug_assert!(self.ports.owned_ports.contains(&local_port_id));
 
                let silent_branch = &*branch;
 

	
 
                // Create a copied branch who will have the port set to firing
 
                let firing_index = self.branches.len() as u32;
 
                let mut firing_branch = Branch::new_sync_branching_from(firing_index, silent_branch);
 
                self.ports.prepare_sync_branch(branch.index.index, firing_index);
 

	
 
                let firing_port = self.ports.get_port_mut(firing_index, local_port_index);
 
                firing_port.mark_speculative(1);
 

	
 
                // Assign the old branch a silent value
 
                let silent_port = self.ports.get_port_mut(silent_branch.index.index, local_port_index);
 
                silent_port.mark_speculative(0);
 

	
 
                // Run both branches again
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, silent_branch.index);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, firing_branch.index);
 
                self.branches.push(firing_branch);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            RunResult::BranchMissingPortValue(port_id) => {
 
                // Branch performed a `get` on a port that has not yet received
 
                // a value in its inbox.
 
                let local_port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let local_port_index = self.ports.get_port_index(local_port_id);
 
                if local_port_index.is_none() {
 
                    todo!("deal with the case where the port is acquired");
 
                }
 
                let local_port_index = local_port_index.unwrap();
 
                let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index);
 

	
 
                // Check for port mapping assignment and, if present, if it is
 
                // consistent
 
                let is_valid_get = if port_mapping.is_assigned {
 
                    assert!(port_mapping.num_times_fired <= 1); // temporary, until we get rid of `fires`
 
                    port_mapping.num_times_fired == 1
 
                } else {
 
                    // Not yet assigned
 
                    port_mapping.mark_speculative(1);
 
                    true
 
                };
 

	
 
                if is_valid_get {
 
                    // Mark as a branching point for future messages
 
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                    Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch.index);
 

	
 
                    // But if some messages can be immediately applied, do so
 
                    // now.
 
                    let messages = self.inbox.get_messages(local_port_id, port_mapping.last_registered_branch_id);
 
                    let mut did_have_messages = false;
 

	
 
                    for message in messages {
 
                        did_have_messages = true;
 

	
 
                        // For each message prepare a new branch to execute
 
                        let new_branch_index = self.branches.len() as u32;
 
                        let mut new_branch = Branch::new_sync_branching_from(new_branch_index, branch);
 
                        self.ports.prepare_sync_branch(branch.index.index, new_branch_index);
 

	
 
                        let port_mapping = self.ports.get_port_mut(new_branch_index, local_port_index);
 
                        port_mapping.last_registered_branch_id = message.sender_cur_branch_id;
 
                        debug_assert!(port_mapping.is_assigned && port_mapping.num_times_fired == 1);
 

	
 
                        new_branch.received.insert(local_port_id, message.clone());
 

	
 
                        // If the message contains any ports then they will now
 
                        // be owned by the new branch
 
                        debug_assert!(results.ports.is_empty());
 
                        find_ports_in_value_group(&message.message, &mut results.ports);
 
                        Self::acquire_ports_during_sync(&mut self.ports, &mut new_branch, &results.ports);
 
                        results.ports.clear();
 

	
 
                        // Schedule the new branch
 
                        debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync);
 
                        Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch.index);
 
                        self.branches.push(new_branch);
 
                    }
 

	
 
                    if did_have_messages {
 
                        // If we did create any new branches, then we can run
 
                        // them immediately.
 
                        return ConnectorScheduling::Immediate;
 
                    }
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            RunResult::BranchAtSyncEnd => {
 
                // Branch is done, go through all of the ports that are not yet
 
                // assigned and map them to non-firing.
 
                for port_idx in 0..self.ports.num_ports() {
 
                    let port_mapping = self.ports.get_port_mut(branch.index.index, port_idx);
 
                    if !port_mapping.is_assigned {
 
                        port_mapping.mark_speculative(0);
 
                    }
 
                }
 

	
 
                branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_finished, branch.index);
 
            },
 
            RunResult::BranchPut(port_id, value_group) => {
 
                // Branch performed a `put` on a particualar port.
 
                let local_port_id = PortIdLocal{ index: port_id.0.u32_suffix };
 
                let local_port_index = self.ports.get_port_index(local_port_id);
 
                if local_port_index.is_none() {
 
                    todo!("handle case where port was received before (i.e. in ports_delta)")
 
                }
 
                let local_port_index = local_port_index.unwrap();
 

	
 
                // Check the port mapping for consistency
 
                // TODO: For now we can only put once, so that simplifies stuff
 
                let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index);
 
                let is_valid_put = if port_mapping.is_assigned {
 
                    // Already assigned, so must be speculative and one time
 
                    // firing, otherwise we are `put`ing multiple times.
 
                    if port_mapping.last_registered_branch_id.is_valid() {
 
                        // Already did a `put`
 
                        todo!("handle error through RunDeltaState");
 
                    } else {
 
                        // Valid if speculatively firing
 
                        port_mapping.num_times_fired == 1
 
                    }
 
                } else {
 
                    // Not yet assigned, do so now
 
                    true
 
                };
 

	
 
                if is_valid_put {
 
                    // Put in run results for thread to pick up and transfer to
 
                    // the correct connector inbox.
 
                    port_mapping.mark_definitive(branch.index, 1);
 
                    let message = OutgoingDataMessage {
 
                    let message = DataMessage{
 
                        sending_port: local_port_id,
 
                        sender_prev_branch_id: BranchId::new_invalid(),
 
                        sender_cur_branch_id: branch.index,
 
                        message: value_group,
 
                    };
 

	
 
                    // If the message contains any ports then we release our
 
                    // ownership over them in this branch
 
                    debug_assert!(results.ports.is_empty());
 
                    find_ports_in_value_group(&message.message, &mut results.ports);
 
                    Self::release_ports_during_sync(&mut self.ports, branch, &results.ports);
 
                    results.ports.clear();
 

	
 
                    results.outbox.push(OutgoingMessage::Data(message));
 
                    results.outbox.push(MessageContents::Data(message));
 
                    return ConnectorScheduling::Immediate
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in sync mode", run_result),
 
        }
 

	
 
        // Not immediately scheduling, so schedule again if there are more
 
        // branches to run
 
        if self.sync_active.is_empty() {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    /// Runs the connector in non-synchronous mode.
 
    pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(!self.in_sync);
 
        debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty());
 
        debug_assert!(self.branches.len() == 1);
 

	
 
        let branch = &mut self.branches[0];
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = TempCtx{};
 
        let run_result = branch.code_state.run(&mut run_context, pd);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => {
 
                // Need to wait until all children are terminated
 
                // TODO: Think about how to do this?
 
                branch.sync_state = SpeculativeState::Finished;
 
                return ConnectorScheduling::NotNow;
 
            },
 
            RunResult::ComponentAtSyncStart => {
 
                // Prepare for sync execution and reschedule immediately
 
                self.in_sync = true;
 
                let first_sync_branch = Branch::new_sync_branching_from(1, branch);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch.index);
 
                self.branches.push(first_sync_branch);
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                // Construction of a new component. Find all references to ports
 
                // inside of the arguments
 
                debug_assert!(results.ports.is_empty());
 
                find_ports_in_value_group(&arguments, &mut results.ports);
 

	
 
                if !results.ports.is_empty() {
 
                    // Ports changing ownership
 
                    if let Err(_) = Self::release_ports_during_non_sync(&mut self.ports, branch, &results.ports) {
 
                        todo!("fatal error handling");
 
                    }
 
                }
 

	
 
                // Add connector for later execution
 
                let new_connector_state = ComponentState {
 
                    prompt: Prompt::new(&pd.types, &pd.heap, definition_id, monomorph_idx, arguments)
 
                };
 
                let new_connector_ports = results.ports.clone(); // TODO: Do something with this
 
                let new_connector_branch = Branch::new_initial_branch(new_connector_state);
 
                let new_connector = ConnectorPDL::new(new_connector_branch, new_connector_ports);
 

	
 
                results.new_connectors.push(new_connector);
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
            RunResult::NewChannel => {
 
                // Need to prepare a new channel
 
                todo!("adding channels to some global context");
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result),
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Internal helpers
 
    // -------------------------------------------------------------------------
 

	
 
    // Helpers for management of the branches and their internally stored
 
    // `next_branch_in_queue` and the `BranchQueue` objects. Essentially forming
 
    // linked lists inside of the vector of branches.
 

	
 
    /// Pops from front of linked-list branch queue.
 
    fn pop_branch_from_queue(branches: &mut Vec<Branch>, queue: &mut BranchQueue) -> &mut Branch {
 
    fn pop_branch_from_queue<'a>(branches: &'a mut Vec<Branch>, queue: &mut BranchQueue) -> &'a mut Branch {
 
        debug_assert!(queue.first != 0);
 
        let branch = &mut branches[queue.first as usize];
 
        *queue.first = branch.next_branch_in_queue.unwrap_or(0);
 
        queue.first = branch.next_branch_in_queue.unwrap_or(0);
 
        branch.next_branch_in_queue = None;
 

	
 
        if *queue.first == 0 {
 
        if queue.first == 0 {
 
            // No more entries in queue
 
            debug_assert_eq!(*queue.last, branch.index.index);
 
            *queue.last = 0;
 
            debug_assert_eq!(queue.last, branch.index.index);
 
            queue.last = 0;
 
        }
 

	
 
        return branch;
 
    }
 

	
 
    /// Pushes branch at the end of the linked-list branch queue.
 
    fn push_branch_into_queue(
 
        branches: &mut Vec<Branch>, queue: &mut BranchQueue, to_push: BranchId,
 
    ) {
 
        debug_assert!(to_push.is_valid());
 
        let to_push = to_push.index;
 

	
 
        if *queue.last == 0 {
 
        if queue.last == 0 {
 
            // No branches in the queue at all
 
            debug_assert_eq!(*queue.first, 0);
 
            debug_assert_eq!(queue.first, 0);
 
            branches[to_push as usize].next_branch_in_queue = None;
 
            *queue.first = to_push;
 
            *queue.last = to_push;
 
            queue.first = to_push;
 
            queue.last = to_push;
 
        } else {
 
            // Pre-existing branch in the queue
 
            debug_assert_ne!(*queue.first, 0);
 
            branches[*queue.last as usize].next_branch_in_queue = Some(to_push);
 
            *queue.last = to_push;
 
            debug_assert_ne!(queue.first, 0);
 
            branches[queue.last as usize].next_branch_in_queue = Some(to_push);
 
            queue.last = to_push;
 
        }
 
    }
 

	
 
    /// Removes branch from linked-list queue. Walks through the entire list to
 
    /// find the element (!). Assumption is that this is not called often.
 
    fn remove_branch_from_queue(
 
        branches: &mut Vec<Branch>, queue: &mut BranchQueue, to_delete: BranchId,
 
    ) {
 
        debug_assert!(to_delete.is_valid()); // we're deleting a valid item
 
        debug_assert!(queue.first != 0 && queue.last != 0); // queue isn't empty to begin with
 

	
 
        // Retrieve branch and its next element
 
        let branch_to_delete = &mut branches[to_delete.index as usize];
 
        let branch_next_index_option = branch_to_delete.next_branch_in_queue;
 
        let branch_next_index_unwrapped = branch_next_index_option.unwrap_or(0);
 
        branch_to_delete.next_branch_in_queue = None;
 

	
 
        // Walk through all elements in queue to find branch to delete
 
        let mut prev_index = 0;
 
        let mut next_index = queue.first;
 

	
 
        while next_index != 0 {
 
            if next_index == to_delete.index {
 
                // Found the element we're going to delete
 
                // - check if at the first element or not
 
                if prev_index == 0 {
 
                    queue.first = branch_next_index_unwrapped;
 
                } else {
 
                    let prev_branch = &mut branches[prev_index as usize];
 
                    prev_branch.next_branch_in_queue = branch_next_index_option;
 
                }
 

	
 
                // - check if at last element or not (also takes care of "no elements left in queue")
 
                if branch_next_index_option.is_none() {
 
                    queue.last = prev_index;
 
                }
 

	
 
                return;
 
            }
 

	
 
            prev_index = next_index;
 
        }
 

	
 
        // If here, then we didn't find the element
 
        panic!("branch does not exist in provided queue");
 
    }
 

	
 
    // Helpers for local port management. Specifically for adopting/losing
 
    // ownership over ports, and for checking if specific ports can be sent
 
    // over another port.
 

	
 
    /// Releasing ownership of ports while in non-sync mode. This only occurs
 
    /// while instantiating new connectors
 
    fn release_ports_during_non_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        debug_assert!(!branch.index.is_valid()); // branch in non-sync mode
 

	
 
        for port_id in port_ids {
 
            // We must own the port, or something is wrong with our code
 
            todo!("Set up some kind of message router");
 
            debug_assert!(ports.get_port_index(*port_id).is_some());
 
            ports.remove_port(*port_id);
 
        }
 

	
 
        return Ok(())
 
    }
 

	
 
    /// Releasing ownership of ports during a sync-session. Will provide an
 
    /// error if the port was already used during a sync block.
 
    fn release_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        todo!("unfinished: add port properties during final solution-commit msgs");
 
        debug_assert!(branch.index.is_valid()); // branch in sync mode
 

	
 
        for port_id in port_ids {
 
            match ports.get_port_index(*port_id) {
 
                Some(port_index) => {
 
                    // We (used to) own the port. Make sure it is not given away
 
                    // already and not used to put/get data.
 
                    let port_mapping = ports.get_port(branch.index.index, port_index);
 
                    if port_mapping.is_assigned && port_mapping.num_times_fired != 0 {
 
                        // Already used
 
                        return Err(PortOwnershipError::UsedInInteraction(*port_id));
 
                    }
 

	
 
                    for delta in &branch.ports_delta {
 
                        if delta.port_id == port_id {
 
                        if delta.port_id == *port_id {
 
                            // We cannot have acquired this port, because the
 
                            // call to `ports.get_port_index` returned an index.
 
                            debug_assert!(!delta.acquired);
 
                            return Err(PortOwnershipError::AlreadyGivenAway(*port_id));
 
                        }
 
                    }
 

	
 
                    branch.ports_delta.push(PortOwnershipDelta{
 
                        acquired: false,
 
                        port_id: *port_id,
 
                    });
 
                },
 
                None => {
 
                    // Not in port mapping, so we must have acquired it before,
 
                    // remove the acquirement.
 
                    let mut to_delete_index: isize = -1;
 
                    for (delta_idx, delta) in branch.ports_delta.iter().enumerate() {
 
                        if delta.port_id == *port_id {
 
                            debug_assert!(delta.acquired);
 
                            to_delete_index = delta_idx as isize;
 
                            break;
 
                        }
 
                    }
 

	
 
                    debug_assert!(to_delete_index != -1);
 
                    branch.ports_delta.remove(to_delete_index as usize);
 
                }
 
            }
 
        }
 

	
 
        return Ok(())
 
    }
 

	
 
    /// Acquiring ports during a sync-session.
 
    fn acquire_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        todo!("unfinished: add port properties during final solution-commit msgs");
 
        debug_assert!(branch.index.is_valid()); // branch in sync mode
 

	
 
        'port_loop: for port_id in port_ids {
 
            for (delta_idx, delta) in branch.ports_delta.iter().enumerate() {
 
                if delta.port_id == *port_id {
 
                    if delta.acquired {
 
                        // Somehow already received this port.
 
                        // TODO: @security
 
                        todo!("take care of nefarious peers");
 
                    } else {
 
                        // Sending ports to ourselves
 
                        debug_assert!(ports.get_port_index(*port_id).is_some());
 
                        branch.ports_delta.remove(delta_idx);
 
                        continue 'port_loop;
 
                    }
 
                }
 
            }
 

	
 
            // If here then we can safely acquire the new port
 
            branch.ports_delta.push(PortOwnershipDelta{
 
                acquired: true,
 
                port_id: *port_id,
 
            });
 
        }
 

	
 
        return Ok(())
 
    }
 

	
 
    // Helpers for generating and handling sync messages (and the solutions that
 
    // are described by those sync messages)
 

	
 
    /// Generates the initial solution for a finished sync branch. If initial
 
    /// local solution is valid, then the appropriate message is returned.
 
    /// Otherwise the initial solution is inconsistent.
 
    fn generate_initial_solution_for_branch(&self, branch_id: BranchId, ctx: &ConnectorCtx) -> Option<SyncMessage> {
 
        // Retrieve branchg
 
        debug_assert!(branch_id.is_valid()); // because we're supposed to be in sync mode
 
        let branch = &self.branches[branch_id.index as usize];
 
        debug_assert_eq!(branch.sync_state, SpeculativeState::ReachedSyncEnd);
 

	
 
        // Set up storage (this is also the storage for all of the connectors
 
        // that will be visited, hence the initial size approximation)
 
        let mut all_branch_ids = Vec::new();
 
        self.branch_ids_of_execution_path(branch_id, &mut all_branch_ids);
 

	
 
        let num_ports = self.ports.num_ports();
 
        let approximate_peers = num_ports;
 
        let mut initial_solution_port_mapping = Vec::with_capacity(num_ports);
 
        for port_idx in 0..self.ports.num_ports() {
 
            let port_id = self.ports.get_port_id(port_idx);
 
            let port_desc = self.ports.get_port(branch_id.index, port_idx);
 

	
 
            // Note: if assigned then we expect a valid branch ID. Otherwise we have the "invalid
 
            // branch" as ID, marking that we want it to be silent
 
            debug_assert!(port_desc.is_assigned == port_desc.last_registered_branch_id.is_valid());
 
            initial_solution_port_mapping.push((port_id, port_desc.last_registered_branch_id));
 
        }
 

	
 
        let initial_local_solution = SyncConnectorSolution{
 
            connector_id: ctx.id,
 
            terminating_branch_id: branch_id,
 
            execution_branch_ids: all_branch_ids,
 
            final_port_mapping: initial_solution_port_mapping,
 
        };
 

	
 
        let mut sync_message = SyncMessage::new(initial_local_solution, approximate_peers);
 

	
 
        // Turn local port mapping into constraints on other connectors
 

	
 
        // - constraints on other components due to transferred ports
 
        for port_delta in &branch.ports_delta {
 
            // For transferred ports we always have two constraints: one for the
 
            // sender and one for the receiver, ensuring it was not used.
 
            // TODO: This will fail if a port is passed around multiple times.
 
            //  maybe a special "passed along" entry in `ports_delta`.
 
            if !sync_message.check_constraint(ctx.id, SyncBranchConstraint::SilentPort(port_delta.port_id)) {
 
            if !sync_message.check_constraint(ctx.id, SyncBranchConstraint::SilentPort(port_delta.port_id)).unwrap() {
 
                return None;
 
            }
 

	
 
            // Might need to check if we own the other side of the channel
 
            let port = ctx.get_port(port_delta.port_id);
 
            if !sync_message.add_or_check_constraint(port.peer_connector, SyncBranchConstraint::SilentPort(port.peer_id)).unwrap() {
 
                return None;
 
            }
 
        }
 

	
 
        // - constraints on other components due to owned ports
 
        for port_index in 0..self.ports.num_ports() {
 
            let port_id = self.ports.get_port_id(port_index);
 
            let port_mapping = self.ports.get_port(branch_id.index, port_index);
 
            let port = ctx.get_port(port_id);
 

	
 
            let constraint = if port_mapping.is_assigned {
 
                if port.kind == PortKind::Getter {
 
                    SyncBranchConstraint::BranchNumber(port_mapping.last_registered_branch_id)
 
                } else {
 
                    SyncBranchConstraint::PortMapping(port.peer_id, port_mapping.last_registered_branch_id)
 
                }
 
            } else {
 
                SyncBranchConstraint::SilentPort(port.peer_id)
 
            };
 

	
 
            if !sync_message.add_or_check_constraint(peer_connector_id, constraint).unwrap() {
 
            if !sync_message.add_or_check_constraint(port.peer_connector, constraint).unwrap() {
 
                return None;
 
            }
 
        }
 

	
 
        return Some(sync_message);
 
    }
 

	
 
    fn submit_sync_solution(&mut self, partial_solution: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) {
 
        if partial_solution.to_visit.is_empty() {
 
            // Solution is completely consistent. So ask everyone to commit
 
            // TODO: Maybe another package for random?
 
            let comparison_number: u64 = unsafe {
 
                let mut random_array = [0u8; 8];
 
                getrandom::getrandom(&mut random_array);
 
                std::mem::transmute(random_array)
 
            };
 

	
 
            let num_local = partial_solution.local_solutions.len();
 

	
 
            let mut full_solution = SolutionMessage{
 
                comparison_number,
 
                connector_origin: ctx.id,
 
                local_solutions: Vec::with_capacity(num_local),
 
                to_visit: Vec::with_capacity(num_local - 1),
 
            };
 

	
 
            for local_solution in &partial_solution.local_solutions {
 
                full_solution.local_solutions.push((local_solution.connector_id, local_solution.terminating_branch_id));
 
                if local_solution.connector_id != ctx.id {
 
                    full_solution.to_visit.push(local_solution.connector_id);
 
                }
 
            }
 

	
 
            debug_assert!(self.committed_to.is_none());
 
            self.committed_to = Some((full_solution.connector_origin, full_solution.comparison_number));
 
            results.outbox.push(MessageContents::RequestCommit(full_solution));
 
        } else {
 
            // Still have connectors to visit
 
            results.outbox.push(MessageContents::Sync(partial_solution));
 
        }
 
    }
 

	
 
    fn branch_ids_of_execution_path(&self, leaf_branch_id: BranchId, parents: &mut Vec<BranchId>) {
 
        debug_assert!(parents.is_empty());
 

	
 
        let mut next_branch_id = leaf_branch_id;
 
        debug_assert!(next_branch_id.is_valid());
 

	
 
        while next_branch_id.is_valid() {
 
            parents.push(next_branch_id);
 
            let branch = &self.branches[next_branch_id.index as usize];
 
            next_branch_id = branch.parent_index;
 
        }
 
    }
 
}
 

	
 
/// A data structure passed to a connector whose code is being executed that is
 
/// used to queue up various state changes that have to be applied after
 
/// running, e.g. the messages the have to be transferred to other connectors.
 
// TODO: Come up with a better name
 
pub(crate) struct RunDeltaState {
 
    // Variables that allow the thread running the connector to pick up global
 
    // state changes and try to apply them.
 
    pub outbox: Vec<MessageContents>,
 
    pub new_connectors: Vec<ConnectorPDL>,
 
    pub new_ports: Vec<Port>,
 
    // Workspaces
 
    pub ports: Vec<PortIdLocal>,
 
}
 

	
 
impl RunDeltaState {
 
    /// Constructs a new `RunDeltaState` object with the default amount of
 
    /// reserved memory
 
    pub fn new() -> Self {
 
        RunDeltaState{
 
            outbox: Vec::with_capacity(64),
 
            new_connectors: Vec::new(),
 
            new_ports: Vec::new(),
 
            ports: Vec::with_capacity(64),
 
        }
 
    }
 
}
 

	
 
#[derive(Eq, PartialEq)]
 
pub(crate) enum ConnectorScheduling {
 
    Immediate,      // Run again, immediately
 
    Later,          // Schedule for running, at some later point in time
 
    NotNow,         // Do not reschedule for running
 
}
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortIdLocal>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortIdLocal>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                let cur_port = PortIdLocal::new(port_id.0.u32_suffix);
 
                for prev_port in ports.iter() {
 
                    if prev_port == cur_port {
 
                    if *prev_port == cur_port {
 
                        // Already added
 
                        return;
 
                    }
 
                }
 

	
 
                ports.push(cur_port);
 
            },
 
            Value::Array(heap_pos) |
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
 
                // Reference to some dynamic thing which might contain ports,
 
                // so recurse
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for embedded_value in heap_region {
 
                    find_port_in_value(group, embedded_value, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 

	
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for value in &value_group.values {
 
        find_port_in_value(value_group, value, ports);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/global_store.rs
Show inline comments
 
use std::ptr;
 
use std::sync::{Arc, Barrier, RwLock, RwLockReadGuard};
 
use std::sync::{Arc, RwLock};
 
use std::sync::atomic::{AtomicBool, AtomicU32};
 

	
 
use crate::collections::{MpmcQueue, RawVec};
 

	
 
use super::connector::{ConnectorPDL, ConnectorPublic};
 
use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel};
 
use super::inbox::PublicInbox;
 
use super::scheduler::Router;
 

	
 
use crate::ProtocolDescription;
 
use crate::runtime2::connector::{ConnectorScheduling, RunDeltaState};
 
use crate::runtime2::inbox::{DataMessage, MessageContents, SyncMessage};
 
use crate::runtime2::native::Connector;
 
use crate::runtime2::inbox::MessageContents;
 
use crate::runtime2::native::{Connector, ConnectorApplication};
 
use crate::runtime2::scheduler::ConnectorCtx;
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
/// key is the only one that may execute the connector's code.
 
pub(crate) struct ConnectorKey {
 
    pub index: u32, // of connector
 
}
 

	
 
impl ConnectorKey {
 
    /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
 
    /// access, to a "regular ID" which can be used to obtain immutable access.
 
    #[inline]
 
    pub fn downcast(&self) -> ConnectorId {
 
        return ConnectorId(self.index);
 
    }
 

	
 
    /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
 
    /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
 
    #[inline]
 
    pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
 
        return ConnectorKey{ index: id.0 };
 
    }
 
}
 

	
 
/// A kind of token that allows shared access to a connector. Multiple threads
 
/// may hold this
 
#[derive(Copy, Clone)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) struct ConnectorId(pub u32);
 

	
 
impl ConnectorId {
 
    // TODO: Like the other `new_invalid`, maybe remove
 
    #[inline]
 
    pub fn new_invalid() -> ConnectorId {
 
        return ConnectorId(u32::MAX);
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.0 != u32::MAX;
 
    }
 
}
 

	
 
// TODO: Change this, I hate this. But I also don't want to put `public` and
 
//  `router` of `ScheduledConnector` back into `Connector`. The reason I don't
 
//  want `Box<dyn Connector>` everywhere is because of the v-table overhead. But
 
//  to truly design this properly I need some benchmarks.
 
pub enum ConnectorVariant {
 
    UserDefined(ConnectorPDL),
 
    Native(Box<dyn Connector>),
 
}
 

	
 
impl Connector for ConnectorVariant {
 
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state),
 
            ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state),
 
        }
 
    }
 

	
 
    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state),
 
            ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state),
 
        }
 
    }
 
}
 

	
 
pub struct ScheduledConnector {
 
    pub connector: ConnectorVariant, // access by connector
 
    pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector
 
    pub public: ConnectorPublic, // accessible by all schedulers and connectors
 
    pub router: Router,
 
}
 

	
 
/// The registry containing all connectors. The idea here is that when someone
 
/// owns a `ConnectorKey`, then one has unique access to that connector.
 
/// Otherwise one has shared access.
 
///
 
/// This datastructure is built to be wrapped in a RwLock.
 
pub(crate) struct ConnectorStore {
 
    pub(crate) port_counter: Arc<AtomicU32>,
 
    inner: RwLock<ConnectorStoreInner>,
 
}
 

	
 
struct ConnectorStoreInner {
 
    connectors: RawVec<*mut ScheduledConnector>,
 
    free: Vec<usize>,
 
}
 

	
 
impl ConnectorStore {
 
    fn with_capacity(capacity: usize) -> Self {
 
        return Self{
 
            port_counter: Arc::new(AtomicU32::new(0)),
 
            inner: RwLock::new(ConnectorStoreInner {
 
                connectors: RawVec::with_capacity(capacity),
 
                free: Vec::with_capacity(capacity),
 
            }),
 
        };
 
    }
 

	
 
    /// Retrieves the shared members of the connector.
 
    pub(crate) fn get_shared(&self, connector_id: ConnectorId) -> &'static ConnectorPublic {
 
        let lock = self.inner.read().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get(connector_id.0 as usize);
 
            debug_assert!(!connector.is_null());
 
            return &*connector.public;
 
            return &(**connector).public;
 
        }
 
    }
 

	
 
    /// Retrieves a particular connector. Only the thread that pulled the
 
    /// associated key out of the execution queue should (be able to) call this.
 
    pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector {
 
        let lock = self.inner.read().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get_mut(key.index as usize);
 
            debug_assert!(!connector.is_null());
 
            return *connector as &mut _;
 
            return &mut (**connector);
 
        }
 
    }
 

	
 
    pub(crate) fn create_interface(&self, connector: ConnectorApplication) -> ConnectorKey {
 
        // Connector interface does not own any initial ports, and cannot be
 
        // created by another connector
 
        let key = self.create_connector_raw(ConnectorVariant::Native(Box::new(connector)));
 
        return key;
 
    }
 

	
 
    /// Create a new connector, returning the key that can be used to retrieve
 
    /// and/or queue it. The caller must make sure that the constructed
 
    /// connector's code is initialized with the same ports as the ports in the
 
    /// `initial_ports` array.
 
    pub(crate) fn create(&self, created_by: &mut ScheduledConnector, connector: ConnectorVariant, initial_ports: Vec<Port>) -> ConnectorKey {
 
    pub(crate) fn create_pdl(&self, created_by: &mut ScheduledConnector, connector: ConnectorPDL) -> ConnectorKey {
 
        let key = self.create_connector_raw(ConnectorVariant::UserDefined(connector));
 
        let new_connector = self.get_mut(&key);
 

	
 
        // Transferring ownership of ports (and crashing if there is a
 
        // programmer's mistake in port management)
 
        match &new_connector.connector {
 
            ConnectorVariant::UserDefined(connector) => {
 
                for port_id in &connector.ports.owned_ports {
 
                    let mut port = created_by.context.remove_port(*port_id);
 
                    new_connector.context.add_port(port);
 
                }
 
            },
 
            ConnectorVariant::Native(_) => unreachable!(),
 
        }
 

	
 
        return key;
 
    }
 

	
 
    pub(crate) fn destroy(&self, key: ConnectorKey) {
 
        let lock = self.inner.write().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get_mut(key.index as usize);
 
            ptr::drop_in_place(*connector);
 
            // Note: but not deallocating!
 
        }
 

	
 
        lock.free.push(key.index as usize);
 
    }
 

	
 
    /// Creates a connector but does not set its initial ports
 
    fn create_connector_raw(&self, connector: ConnectorVariant) -> ConnectorKey {
 
        // Creation of the connector in the global store, requires a lock
 
        let index;
 
        {
 
            let lock = self.inner.write().unwrap();
 
            let connector = ScheduledConnector {
 
                connector,
 
                context: ConnectorCtx::new(self.port_counter.clone()),
 
                public: ConnectorPublic::new(),
 
                router: Router::new(),
 
            };
 

	
 
            let index;
 
            if lock.free.is_empty() {
 
                let connector = Box::into_raw(Box::new(connector));
 

	
 
                unsafe {
 
                    // Cheating a bit here. Anyway, move to heap, store in list
 
                    index = lock.connectors.len();
 
                    lock.connectors.push(connector);
 
                }
 
            } else {
 
                index = lock.free.pop().unwrap();
 

	
 
                unsafe {
 
                    let target = lock.connectors.get_mut(index);
 
                    debug_assert!(!target.is_null());
 
                    ptr::write(*target, connector);
 
                }
 
            }
 
        }
 

	
 
        // Setting of new connector's ID
 
        // Generate key and retrieve the connector to set its ID
 
        let key = ConnectorKey{ index: index as u32 };
 
        let new_connector = self.get_mut(&key);
 
        new_connector.context.id = key.downcast();
 

	
 
        // Transferring ownership of ports (and crashing if there is a
 
        // programmer's mistake in port management)
 
        match &new_connector.connector {
 
            ConnectorVariant::UserDefined(connector) => {
 
                for port_id in &connector.ports.owned_ports {
 
                    let mut port = created_by.context.remove_port(*port_id);
 
                    new_connector.context.add_port(port);
 
                }
 
            },
 
            ConnectorVariant::Native(_) => {}, // no initial ports (yet!)
 
        }
 

	
 
        // Return the connector key
 
        return key;
 
    }
 

	
 
    pub(crate) fn destroy(&self, key: ConnectorKey) {
 
        let lock = self.inner.write().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get_mut(key.index as usize);
 
            ptr::drop_in_place(*connector);
 
            // Note: but not deallocating!
 
        }
 

	
 
        lock.free.push(key.index as usize);
 
    }
 
}
 

	
 
impl Drop for ConnectorStore {
 
    fn drop(&mut self) {
 
        let lock = self.inner.write().unwrap();
 

	
 
        for idx in 0..lock.connectors.len() {
 
            unsafe {
 
                let memory = *lock.connectors.get_mut(idx);
 
                let _ = Box::from_raw(memory); // takes care of deallocation
 
            }
 
        }
 
    }
 
}
 

	
 
/// Global store of connectors, ports and queues that are used by the sceduler
 
/// threads. The global store has the appearance of a thread-safe datatype, but
 
/// one needs to be careful using it.
 
///
 
/// TODO: @docs
 
/// TODO: @Optimize, very lazy implementation of concurrent datastructures.
 
///     This includes the `should_exit` and `did_exit` pair!
 
pub struct GlobalStore {
 
    pub connector_queue: MpmcQueue<ConnectorKey>,
 
    pub connectors: ConnectorStore,
 
    pub should_exit: AtomicBool,    // signal threads to exit
 
}
 

	
 
impl GlobalStore {
 
    pub fn new() -> Self {
 
        Self{
 
            connector_queue: MpmcQueue::with_capacity(256),
 
            connectors: ConnectorStore::with_capacity(256),
 
            should_exit: AtomicBool::new(false),
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/inbox.rs
Show inline comments
 
/**
 
inbox.rs
 

	
 
Contains various types of inboxes and message types for the connectors. There
 
are two kinds of inboxes:
 

	
 
The `PublicInbox` is a simple message queue. Messages are put in by various
 
threads, and they're taken out by a single thread. These messages may contain
 
control messages and may be filtered or redirected by the scheduler.
 

	
 
The `PrivateInbox` is a temporary storage for all messages that are received
 
within a certain sync-round.
 
**/
 

	
 
use std::collections::VecDeque;
 
use std::sync::{RwLock, RwLockReadGuard, Mutex};
 
use std::sync::atomic::{AtomicUsize, Ordering};
 
use std::sync::Mutex;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use super::connector::{BranchId, PortIdLocal};
 
use super::connector::BranchId;
 
use super::port::PortIdLocal;
 
use super::global_store::ConnectorId;
 

	
 
/// A message that has been delivered (after being imbued with the receiving
 
/// port by the scheduler) to a connector.
 
#[derive(Clone)]
 
pub struct DataMessage {
 
    pub sending_port: PortIdLocal,
 
    pub sender_prev_branch_id: BranchId,
 
    pub sender_cur_branch_id: BranchId,
 
    pub message: ValueGroup,
 
}
 

	
 
#[derive(Clone)]
 
pub enum SyncBranchConstraint {
 
    SilentPort(PortIdLocal),
 
    BranchNumber(BranchId),
 
    PortMapping(PortIdLocal, BranchId),
 
}
 

	
 
#[derive(Clone)]
 
pub struct SyncConnectorSolution {
 
    pub connector_id: ConnectorId,
 
    pub terminating_branch_id: BranchId,
 
    pub execution_branch_ids: Vec<BranchId>, // no particular ordering of IDs enforced
 
    pub final_port_mapping: Vec<(PortIdLocal, BranchId)>
 
}
 

	
 
#[derive(Clone)]
 
pub struct SyncConnectorConstraints {
 
    pub connector_id: ConnectorId,
 
    pub constraints: Vec<SyncBranchConstraint>,
 
}
 

	
 
#[derive(Clone)]
 
pub struct SyncMessage {
 
    pub local_solutions: Vec<SyncConnectorSolution>,
 
    pub constraints: Vec<SyncConnectorConstraints>,
 
    pub to_visit: Vec<ConnectorId>,
 
}
 

	
 
// TODO: Shouldn't really be here, right?
 
impl SyncMessage {
 
    /// Creates a new sync message. Assumes that it is created by a connector
 
    /// that has just encountered a new local solution.
 
    pub(crate) fn new(initial_solution: SyncConnectorSolution, approximate_peers: usize) -> Self {
 
        let mut local_solutions = Vec::with_capacity(approximate_peers);
 
        local_solutions.push(initial_solution);
 

	
 
        return Self{
 
            local_solutions,
 
            constraints: Vec::with_capacity(approximate_peers),
 
            to_visit: Vec::with_capacity(approximate_peers),
 
        };
 
    }
 

	
 
    /// Checks if a connector has already provided a local solution
 
    pub(crate) fn has_local_solution_for(&self, connector_id: ConnectorId) -> bool {
 
        return self.local_solutions
 
            .iter()
 
            .any(|v| v.connector_id == connector_id);
 
    }
 

	
 
    /// Adds a new constraint. If the connector has already provided a local
 
    /// solution then the constraint will be checked. Otherwise the constraint
 
    /// will be added to the solution. If this is the first constraint for a
 
    /// connector then it will be added to the connectors that still have to be
 
    /// visited.
 
    ///
 
    /// If this returns true then the constraint was added, or the local
 
    /// solution for the specified connector satisfies the constraint. If this
 
    /// function returns an error then we're dealing with a nefarious peer.
 
    pub(crate) fn add_or_check_constraint(
 
        &mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint
 
    ) -> Result<bool, ()> {
 
        if self.has_local_solution_for(connector_id) {
 
            return self.check_constraint(connector_id, constraint);
 
        } else {
 
            self.add_constraint(connector_id, constraint);
 
            return Ok(true);
 
        }
 
    }
 

	
 
    /// Pushes a new connector constraint. Caller must ensure that the solution
 
    /// has not yet arrived at the specified connector (because then it would no
 
    /// longer have constraints, but a proposed solution instead).
 
    pub(crate) fn add_constraint(&mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint) {
 
        debug_assert!(!self.has_local_solution_for(connector_id));
 

	
 
        let position = self.constraints
 
            .iter()
 
            .position(|v| v.connector_id == connector_id);
 

	
 
        match position {
 
            Some(index) => {
 
                // Has pre-existing constraints
 
                debug_assert!(self.to_visit.contains(&connector_id));
 
                let entry = &mut self.constraints[index];
 
                entry.constraints.push(constraint);
 
            },
 
            None => {
 
                debug_assert!(!self.to_visit.contains(&connector_id));
 
                self.constraints.push(SyncConnectorConstraints{
 
                    connector_id,
 
                    constraints: vec![constraint],
 
                });
 
                self.to_visit.push(connector_id);
 
            }
 
        }
 
    }
 

	
 
    /// Checks if a constraint is satisfied by a solution. Caller must make sure
 
    /// that a local solution has already been provided. Will return an error
 
    /// value only if the provided constraint does not make sense (i.e. a
 
    /// nefarious peer has supplied a constraint with a port we do not own).
 
    pub(crate) fn check_constraint(&self, connector_id: ConnectorId, constraint: SyncBranchConstraint) -> Result<bool, ()>  {
 
        debug_assert!(self.has_local_solution_for(connector_id));
 

	
 
        let entry = self.local_solutions
 
            .iter()
 
            .find(|v| v.connector_id == connector_id)
 
            .unwrap();
 

	
 
        match constraint {
 
            SyncBranchConstraint::SilentPort(silent_port_id) => {
 
                for (port_id, mapped_id) in &entry.final_port_mapping {
 
                    if port_id == silent_port_id {
 
                    if *port_id == silent_port_id {
 
                        // If silent, then mapped ID is invalid
 
                        return Ok(!mapped_id.is_valid())
 
                    }
 
                }
 

	
 
                return Err(());
 
            },
 
            SyncBranchConstraint::BranchNumber(expected_branch_id) => {
 
                return Ok(entry.execution_branch_ids.contains(&expected_branch_id));
 
            },
 
            SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => {
 
                for (port_id, mapped_id) in &entry.final_port_mapping {
 
                    if port_id == port_id {
 
                        return Ok(*mapped_id == expected_branch_id);
 
                    }
 
                }
 

	
 
                return Err(());
 
            },
 
        }
 
    }
 
}
 

	
 
#[derive(Clone)]
 
pub struct SolutionMessage {
 
    pub comparison_number: u64,
 
    pub connector_origin: ConnectorId,
 
    pub local_solutions: Vec<(ConnectorId, BranchId)>,
 
    pub to_visit: Vec<ConnectorId>,
 
}
 

	
 
/// A control message. These might be sent by the scheduler to notify eachother
 
/// of asynchronous state changes.
 
#[derive(Clone)]
 
pub struct ControlMessage {
 
    pub id: u32, // generic identifier, used to match request to response
 
    pub content: ControlMessageVariant,
 
}
 

	
 
#[derive(Clone)]
 
pub enum ControlMessageVariant {
 
    ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port
 
    Ack, // acknowledgement of previous control message, matching occurs through control message ID.
 
}
 

	
 
/// Generic message contents.
 
#[derive(Clone)]
 
pub enum MessageContents {
 
    Data(DataMessage),              // data message, handled by connector
 
    Sync(SyncMessage),              // sync message, handled by both connector/scheduler
 
    RequestCommit(SolutionMessage), // solution message, requesting participants to commit
 
    ConfirmCommit(SolutionMessage), // solution message, confirming a solution everyone committed to
 
    Control(ControlMessage),        // control message, handled by scheduler
 
    Ping,                           // ping message, intentionally waking up a connector (used for native connectors)
 
}
 

	
 
pub struct Message {
 
    pub sending_connector: ConnectorId,
 
    pub receiving_port: PortIdLocal, // may be invalid (in case of messages targeted at the connector)
 
    pub contents: MessageContents,
 
}
 

	
 
/// The public inbox of a connector. The thread running the connector that owns
 
/// this inbox may retrieved from it. Non-owning threads may only put new
 
/// messages inside of it.
 
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
 
//  Should behave as a MPSC queue.
 
pub struct PublicInbox {
 
    messages: Mutex<VecDeque<Message>>,
 
}
 

	
 
impl PublicInbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Mutex::new(VecDeque::new()),
 
        }
 
    }
 

	
 
    pub fn insert_message(&self, message: Message) {
 
        let mut lock = self.messages.lock().unwrap();
 
        lock.push_back(message);
 
    }
 

	
 
    pub fn take_message(&self) -> Option<Message> {
 
        let mut lock = self.messages.lock().unwrap();
 
        return lock.pop_front();
 
    }
 

	
 
    pub fn is_empty(&self) -> bool {
 
        let lock = self.messages.lock().unwrap();
 
        return lock.is_empty();
 
    }
 
}
 

	
 
pub struct PrivateInbox {
 
    // "Normal" messages, intended for a PDL protocol. These need to stick
 
    // around during an entire sync-block (to handle `put`s for which the
 
    // corresponding `get`s have not yet been reached).
 
    messages: Vec<DataMessage>,
 
    len_read: usize,
 
}
 

	
 
impl PrivateInbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Vec::new(),
 
            len_read: 0,
 
        }
 
    }
 

	
 
    /// Will insert the message into the inbox. Only exception is when the tuple
 
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
 
    /// nothing is inserted..
 
    pub fn insert_message(&mut self, message: DataMessage) {
 
        for existing in self.messages.iter() {
 
            if existing.sender_prev_branch_id == message.sender_prev_branch_id &&
 
                    existing.sender_cur_branch_id == message.sender_cur_branch_id &&
 
                    existing.receiving_port == message.receiving_port {
 
                    existing.sending_port == message.sending_port {
 
                // Message was already received
 
                return;
 
            }
 
        }
 

	
 
        self.messages.push(message);
 
    }
 

	
 
    /// Retrieves all previously read messages that satisfy the provided
 
    /// speculative conditions. Note that the inbox remains read-locked until
 
    /// the returned iterator is dropped. Should only be called by the
 
    /// inbox-reader (i.e. the thread executing a connector's PDL code).
 
    ///
 
    /// This function should only be used to check if already-received messages
 
    /// could be received by a newly encountered `get` call in a connector's
 
    /// PDL code.
 
    pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter {
 
        return InboxMessageIter{
 
            messages: &self.messages,
 
            next_index: 0,
 
            max_index: self.len_read,
 
            match_port_id: port_id,
 
            match_prev_branch_id: prev_branch_id,
 
        };
 
    }
 

	
 
    /// Retrieves the next unread message. Should only be called by the
 
    /// inbox-reader.
 
    pub fn next_message(&mut self) -> Option<&DataMessage> {
 
        if self.len_read == self.messages.len() {
 
            return None;
 
        }
 

	
 
        let to_return = &self.messages[self.len_read];
 
        self.len_read += 1;
 
        return Some(to_return);
 
    }
 

	
 
    /// Simply empties the inbox
 
    pub fn clear(&mut self) {
 
        self.messages.clear();
 
        self.len_read = 0;
 
    }
 
}
 

	
 
/// Iterator over previously received messages in the inbox.
 
pub struct InboxMessageIter<'i> {
 
    messages: &'i Vec<DataMessage>,
 
    next_index: usize,
 
    max_index: usize,
 
    match_port_id: PortIdLocal,
 
    match_prev_branch_id: BranchId,
 
}
 

	
 
impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> {
 
    type Item = &'m DataMessage;
 
impl<'i> Iterator for InboxMessageIter<'i> {
 
    type Item = &'i DataMessage;
 

	
 
    fn next(&'m mut self) -> Option<Self::Item> {
 
    fn next(&mut self) -> Option<Self::Item> {
 
        // Loop until match is found or at end of messages
 
        while self.next_index < self.max_index {
 
            let cur_message = &self.messages[self.next_index];
 
            if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id {
 
                // Found a match
 
                break;
 
            }
 

	
 
            self.next_index += 1;
 
        }
 

	
 
        if self.next_index == self.max_index {
 
            return None;
 
        }
 

	
 
        let message = &self.messages[self.next_index];
 
        self.next_index += 1;
 
        return Some(message);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/messages.rs
Show inline comments
 
use std::cmp::Ordering;
 
use std::collections::hash_map::Entry;
 
use std::collections::HashMap;
 

	
 
use crate::common::Id;
 
use crate::PortId;
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 

	
 
use super::connector::{BranchId, PortIdLocal};
 

	
 
/// A message residing in a connector's inbox (waiting to be put into some kind
 
/// of speculative branch), or a message waiting to be sent.
 
#[derive(Clone)]
 
pub struct BufferedMessage {
 
    pub(crate) sending_port: PortId,
 
    pub(crate) receiving_port: PortId,
 
    pub(crate) peer_prev_branch_id: Option<u32>,
 
    pub(crate) peer_cur_branch_id: u32,
 
    pub(crate) message: ValueGroup,
 
}
 

	
 
/// A connector's global inbox. Any received message ends up here. This is
 
/// because a message might be received before a branch arrives at the
 
/// corresponding `get()` that is supposed to receive that message. Hence we
 
/// need to store it for all future branches that might be able to receive it.
 
pub struct ConnectorInbox {
 
    // TODO: @optimize, HashMap + Vec is a bit stupid.
 
    messages: HashMap<PortAction, Vec<BufferedMessage>>
 
}
 

	
 

	
 
/// An action performed on a port. Unsure about this
 
#[derive(PartialEq, Eq, Hash)]
 
struct PortAction {
 
    port_id: u32,
 
    prev_branch_id: Option<u32>,
 
}
 

	
 
// TODO: @remove
 
impl ConnectorInbox {
 
    pub fn new() -> Self {
 
        Self {
 
            messages: HashMap::new(),
 
        }
 
    }
 

	
 
    /// Inserts a new message into the inbox.
 
    pub fn insert_message(&mut self, message: BufferedMessage) {
 
        // TODO: @error - Messages are received from actors we generally cannot
 
        //  trust, and may be unreliable, so messages may be received multiple
 
        //  times or have spoofed branch IDs. Debug asserts are present for the
 
        //  initial implementation.
 

	
 
        // If it is the first message on the port, then we cannot possible have
 
        // a previous port mapping on that port.
 
        let port_action = PortAction{
 
            port_id: message.receiving_port.0.u32_suffix,
 
            prev_branch_id: message.peer_prev_branch_id,
 
        };
 

	
 
        match self.messages.entry(port_action) {
 
            Entry::Occupied(mut entry) => {
 
                let entry = entry.get_mut();
 
                debug_assert!(
 
                    entry.iter()
 
                        .find(|v| v.peer_cur_branch_id == message.peer_cur_branch_id)
 
                        .is_none(),
 
                    "inbox already contains sent message (same new branch ID)"
 
                );
 

	
 
                entry.push(message);
 
            },
 
            Entry::Vacant(entry) => {
 
                entry.insert(vec![message]);
 
            }
 
        }
 
    }
 

	
 
    /// Checks if the provided port (and the branch id mapped to that port)
 
    /// correspond to any messages in the inbox.
 
    pub fn find_matching_message(&self, port_id: u32, prev_branch_id_at_port: Option<u32>) -> Option<&[BufferedMessage]> {
 
        let port_action = PortAction{
 
            port_id,
 
            prev_branch_id: prev_branch_id_at_port,
 
        };
 

	
 
        match self.messages.get(&port_action) {
 
            Some(messages) => return Some(messages.as_slice()),
 
            None => return None,
 
        }
 
    }
 

	
 
    pub fn clear(&mut self) {
 
        self.messages.clear();
 
    }
 
}
 

	
 
/// A connector's outbox. A temporary storage for messages that are sent by
 
/// branches performing `put`s until we're done running all branches and can
 
/// actually transmit the messages.
 
pub struct ConnectorOutbox {
 
    messages: Vec<BufferedMessage>,
 
}
 

	
 
impl ConnectorOutbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Vec::new(),
 
        }
 
    }
 

	
 
    pub fn insert_message(&mut self, message: BufferedMessage) {
 
        // TODO: @error - Depending on the way we implement the runtime in the
 
        //  future we might end up not trusting "our own code" (i.e. in case
 
        //  the connectors we are running are described by foreign code)
 
        debug_assert!(
 
            self.messages.iter()
 
                .find(|v|
 
                    v.sending_port == message.sending_port &&
 
                    v.peer_prev_branch_id == message.peer_prev_branch_id
 
                )
 
                .is_none(),
 
            "messages was already registered for sending"
 
        );
 

	
 
        self.messages.push(message);
 
    }
 

	
 
    pub fn take_next_message_to_send(&mut self) -> Option<BufferedMessage> {
 
        self.messages.pop()
 
    }
 

	
 
    pub fn clear(&mut self) {
 
        self.messages.clear();
 
    }
 
}
 
\ No newline at end of file
src/runtime2/mod.rs
Show inline comments
 
// Structure of module
 

	
 
mod runtime;
 
mod messages;
 
mod connector;
 
mod native;
 
mod port;
 
mod global_store;
 
mod scheduler;
 
mod inbox;
 

	
 
#[cfg(test)] mod tests;
 

	
 
// Imports
 

	
 
use std::sync::{Arc, Mutex};
 
use std::sync::atomic::Ordering;
 
use std::thread::{self, JoinHandle};
 

	
 
use crate::protocol::eval::*;
 
use crate::{common::Id, PortId, ProtocolDescription};
 
use crate::ProtocolDescription;
 

	
 
use global_store::{ConnectorVariant, GlobalStore};
 
use scheduler::Scheduler;
 
use crate::protocol::ComponentCreationError;
 
use connector::{Branch, ConnectorPDL, find_ports_in_value_group};
 
use native::{ConnectorApplication, ApplicationInterface};
 

	
 

	
 
// Runtime API
 
// TODO: Exit condition is very dirty. Take into account:
 
//  - Connector hack with &'static references. May only destroy (unforced) if all connectors are done working
 
//  - Running schedulers: schedulers need to be signaled that they should exit, then wait until all are done
 
//  - User-owned interfaces: As long as these are owned user may still decide to create new connectors.
 
pub struct Runtime {
 
    inner: Arc<RuntimeInner>,
 
}
 

	
 
pub(crate) struct RuntimeInner {
 
    pub(crate) global_store: GlobalStore,
 
    pub(crate) protocol_description: ProtocolDescription,
 
    schedulers: Mutex<Vec<JoinHandle<()>>>, // TODO: Revise, make exit condition something like: all interfaces dropped
 
}
 

	
 
// TODO: Come back to this at some point
 
unsafe impl Send for RuntimeInner {}
 
unsafe impl Sync for RuntimeInner {}
 

	
 
impl Runtime {
 
    pub fn new(num_threads: usize, protocol_description: ProtocolDescription) -> Runtime {
 
        // Setup global state
 
        assert!(num_threads > 0, "need a thread to run connectors");
 
        let runtime_inner = Arc::new(RuntimeInner{
 
            global_store: GlobalStore::new(),
 
            protocol_description,
 
            schedulers: Mutex::new(Vec::new()),
 
        });
 

	
 
        // Launch threads
 
        {
 
            let mut schedulers = Vec::with_capacity(num_threads);
 
            for _ in 0..num_threads {
 
                let mut scheduler = Scheduler::new(runtime_inner.clone());
 
                let cloned_runtime_inner = runtime_inner.clone();
 
                let thread = thread::spawn(move || {
 
                    let mut scheduler = Scheduler::new(cloned_runtime_inner);
 
                    scheduler.run();
 
                });
 

	
 
                schedulers.push(thread);
 
            }
 

	
 
            let mut lock = runtime_inner.schedulers.lock().unwrap();
 
            *lock = schedulers;
 
        }
 

	
 
        // Return runtime
 
        return Runtime{ inner: runtime_inner };
 
    }
 

	
 
    /// Returns a new interface through which channels and connectors can be
 
    /// created.
 
    pub fn create_interface(&self) -> ApplicationInterface {
 
        let (connector, mut interface) = ConnectorApplication::new(self.inner.clone());
 
        let connector = Box::new(connector);
 

	
 
        let connector_key = self.global_store.connectors.create(ConnectorVariant::Native(connector));
 
        let connector_key = self.inner.global_store.connectors.create_interface(connector);
 
        interface.set_connector_id(connector_key.downcast());
 

	
 
        // Note that we're not scheduling. That is done by the interface in case
 
        // it is actually needed.
 
        return interface;
 
    }
 
}
 

	
 
impl Drop for Runtime {
 
    fn drop(&mut self) {
 
        self.inner.global_store.should_exit.store(true, Ordering::Release);
 
        let mut schedulers = self.inner.schedulers.lock().unwrap();
 
        for scheduler in schedulers.drain(..) {
 
            scheduler.join();
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/native.rs
Show inline comments
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::cell::Cell;
 
use std::sync::atomic::Ordering;
 
use crate::protocol::ComponentCreationError;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::ProtocolDescription;
 
use crate::runtime2::connector::{Branch, find_ports_in_value_group};
 
use crate::runtime2::global_store::{ConnectorKey, GlobalStore};
 
use crate::runtime2::global_store::ConnectorKey;
 
use crate::runtime2::inbox::MessageContents;
 
use crate::runtime2::port::{Port, PortKind};
 
use crate::runtime2::scheduler::ConnectorCtx;
 

	
 
use super::RuntimeInner;
 
use super::global_store::{ConnectorVariant, ConnectorId};
 
use super::global_store::ConnectorId;
 
use super::port::{Channel, PortIdLocal};
 
use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState};
 
use super::inbox::{Message, DataMessage, SyncMessage};
 
use super::inbox::Message;
 

	
 
/// Generic connector interface from the scheduler's point of view.
 
pub trait Connector {
 
    /// Handle a new message (preprocessed by the scheduler). You probably only
 
    /// want to handle `Data`, `Sync`, and `Solution` messages. The others are
 
    /// intended for the scheduler itself.
 
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState);
 

	
 
    /// Should run the connector's behaviour up until the next blocking point.
 
    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
}
 

	
 
type SyncDone = Arc<(Mutex<bool>, Condvar)>;
 
type JobQueue = Arc<Mutex<Vec<ApplicationJob>>>;
 

	
 
enum ApplicationJob {
 
    NewChannel((Port, Port)),
 
    NewConnector(ConnectorPDL),
 
}
 

	
 
/// The connector which an application can directly interface with. Once may set
 
/// up the next synchronous round, and retrieve the data afterwards.
 
pub struct ConnectorApplication {
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
}
 

	
 
impl ConnectorApplication {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>) -> (Self, ApplicationInterface) {
 
        let sync_done = Arc::new(( Mutex::new(false), Condvar::new() ));
 
        let job_queue = Arc::new(Mutex::new(Vec::with_capacity(32)));
 

	
 
        let connector = ConnectorApplication { sync_done: sync_done.clone(), job_queue: job_queue.clone() };
 
        let interface = ApplicationInterface::new(sync_done, job_queue, runtime);
 

	
 
        return (connector, interface);
 
    }
 
}
 

	
 
impl Connector for ConnectorApplication {
 
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        todo!("handling messages in ConnectorApplication (API for runtime)")
 
    }
 

	
 
    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop() {
 
            match job {
 
                ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
 
                    delta_state.new_ports.reserve(2);
 
                    delta_state.new_ports.push(endpoint_a);
 
                    delta_state.new_ports.push(endpoint_b);
 
                }
 
                ApplicationJob::NewConnector(connector) => {
 
                    delta_state.new_connectors.push(connector);
 
                }
 
            }
 
        }
 

	
 
        return ConnectorScheduling::NotNow;
 
    }
 
}
 

	
 
/// The interface to a `ApplicationConnector`. This allows setting up the
 
/// interactions the `ApplicationConnector` performs within a synchronous round.
 
pub struct ApplicationInterface {
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
    runtime: Arc<RuntimeInner>,
 
    connector_id: ConnectorId,
 
    owned_ports: Vec<Port>,
 
    owned_ports: Vec<PortIdLocal>,
 
}
 

	
 
impl ApplicationInterface {
 
    pub(crate) fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner1>) -> Self {
 
    pub(crate) fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner>) -> Self {
 
        return Self{
 
            sync_done, job_queue, runtime,
 
            connector_id: ConnectorId::new_invalid(),
 
            owned_ports: Vec::new(),
 
        }
 
    }
 

	
 
    /// Creates a new channel.
 
    pub fn create_channel(&mut self) -> Channel {
 
        // TODO: Duplicated logic in scheduler
 
        let getter_id = self.runtime.global_store.connectors.port_counter.fetch_add(2, Ordering::SeqCst);
 
        let putter_id = PortIdLocal::new(getter_id + 1);
 
        let getter_id = PortIdLocal::new(getter_id);
 

	
 
        self.owned_ports.push(Port{
 
        // Create ports and add a job such that they are transferred to the
 
        // API component. (note that we do not send a ping, this is only
 
        // necessary once we create a connector)
 
        let getter_port = Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            kind: PortKind::Getter,
 
            peer_connector: self.connector_id,
 
        });
 

	
 
        self.owned_ports.push(Port{
 
        };
 
        let putter_port = Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            kind: PortKind::Putter,
 
            peer_connector: self.connector_id,
 
        });
 
        };
 

	
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
 
            lock.push(ApplicationJob::NewChannel((getter_port, putter_port)));
 
        }
 

	
 
        // Add to owned ports for error checking while creating a connector
 
        self.owned_ports.reserve(2);
 
        self.owned_ports.push(putter_id);
 
        self.owned_ports.push(getter_id);
 

	
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    /// Creates a new connector. Note that it is not scheduled immediately, but
 
    /// depends on the `ApplicationConnector` to run, followed by the created
 
    /// connector being scheduled.
 
    // TODO: Optimize by yanking out scheduler logic for common use.
 
    pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> {
 
        // Retrieve ports and make sure that we own the ones that are currently
 
        // specified. This is also checked by the scheduler, but that is done
 
        // asynchronously.
 
        let mut initial_ports = Vec::new();
 
        find_ports_in_value_group(&arguments, &mut initial_ports);
 
        for port_to_remove in &initial_ports {
 
            match self.owned_ports.iter().position(|v| v == port_to_remove) {
 
                Some(index_to_remove) => {
 
                    // We own the port, so continue
 
                    self.owned_ports.remove(index_to_remove)
 
                    self.owned_ports.remove(index_to_remove);
 
                },
 
                None => {
 
                    // We don't own the port
 
                    return Err(ComponentCreationError::UnownedPort);
 
                }
 
            }
 
        }
 

	
 
        let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?;
 
        let connector = ConnectorPDL::new(Branch::new_initial_branch(state), initial_ports);
 

	
 
        // Put on job queue
 
        {
 
            let mut queue = self.job_queue.lock().unwrap();
 
            queue.push(ApplicationJob::NewConnector(connector));
 
        }
 

	
 
        // Send ping message to wake up connector
 
        let connector = self.runtime.global_store.connectors.get_shared(self.connector_id);
 
        connector.inbox.insert_message(Message::Ping);
 
        connector.inbox.insert_message(Message{
 
            sending_connector: ConnectorId::new_invalid(),
 
            receiving_port: PortIdLocal::new_invalid(),
 
            contents: MessageContents::Ping,
 
        });
 

	
 
        let should_wake_up = connector.sleeping
 
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_wake_up {
 
            let key = unsafe{ ConnectorKey::from_id(self.connector_id) };
 
            self.runtime.global_store.connector_queue.push_back(key);
 
        }
 

	
 
        return Ok(());
 
    }
 

	
 
    /// Check if the next sync-round is finished.
 
    pub fn try_wait(&self) -> bool {
 
        let (is_done, _) = &*self.sync_done;
 
        let lock = is_done.lock().unwrap();
 
        return *lock;
 
    }
 

	
 
    /// Wait until the next sync-round is finished
 
    pub fn wait(&self) {
 
        let (is_done, condition) = &*self.sync_done;
 
        let lock = is_done.lock().unwrap();
 
        condition.wait_while(lock, |v| !*v); // wait while not done
 
    }
 

	
 
    /// Called by runtime to set associated connector's ID.
 
    pub(crate) fn set_connector_id(&mut self, id: ConnectorId) {
 
        self.connector_id = id;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/port.rs
Show inline comments
 
use super::global_store::ConnectorId;
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
 
pub(crate) struct PortIdLocal {
 
    pub index: u32,
 
}
 

	
 
impl PortIdLocal {
 
    pub fn new(id: u32) -> Self {
 
        Self{ index: id }
 
    }
 

	
 
    // TODO: Unsure about this, maybe remove, then also remove all struct
 
    //  instances where I call this
 
    pub fn new_invalid() -> Self {
 
        Self{ index: u32::MAX }
 
    }
 

	
 
    pub fn is_valid(&self) -> bool {
 
        return self.index != u32::MAX;
 
    }
 
}
 

	
 
#[derive(Eq, PartialEq)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
/// Represents a port inside of the runtime. May be without owner if it is
 
/// created by the application interfacing with the runtime, instead of being
 
/// created by a connector.
 
pub struct Port {
 
    pub self_id: PortIdLocal,
 
    pub peer_id: PortIdLocal,
 
    pub kind: PortKind,
 
    pub peer_connector: ConnectorId, // might be temporarily inconsistent while peer port is sent around in non-sync phase.
 
}
 

	
 

	
 

	
 
// TODO: Turn port ID into its own type
 
pub struct Channel {
 
    pub putter_id: PortIdLocal, // can put on it, so from the connector's point of view, this is an output
 
    pub getter_id: PortIdLocal, // vice versa: can get on it, so an input for the connector
 
}
 
\ No newline at end of file
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Arc;
 
use std::sync::Condvar;
 
use std::sync::atomic::{AtomicU32, Ordering};
 
use std::time::Duration;
 
use std::thread;
 

	
 
use crate::ProtocolDescription;
 
use crate::runtime2::global_store::ConnectorVariant;
 
use crate::runtime2::inbox::MessageContents;
 
use crate::runtime2::native::Connector;
 
use crate::runtime2::port::{Channel, PortKind, PortOwnership};
 
use crate::runtime2::port::{Channel, PortKind};
 

	
 
use super::RuntimeInner;
 
use super::port::{Port, PortIdLocal};
 
use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant};
 
use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState};
 
use super::global_store::{ConnectorKey, ConnectorId, GlobalStore};
 
use super::inbox::{Message, ControlMessage, ControlMessageVariant};
 
use super::connector::{ConnectorScheduling, RunDeltaState};
 
use super::global_store::{ConnectorKey, ConnectorId};
 

	
 
/// Contains fields that are mostly managed by the scheduler, but may be
 
/// accessed by the connector
 
pub(crate) struct ConnectorCtx {
 
    pub(crate) id: ConnectorId,
 
    port_counter: Arc<AtomicU32>,
 
    pub(crate) ports: Vec<Port>,
 
}
 

	
 
impl ConnectorCtx {
 
    pub(crate) fn new(port_counter: Arc<AtomicU32>) -> ConnectorCtx {
 
        Self{
 
            id: ConnectorId::new_invalid(),
 
            port_counter,
 
            ports: initial_ports,
 
            ports: Vec::new(),
 
        }
 
    }
 

	
 
    /// Creates a (putter, getter) port pair belonging to the same channel. The
 
    /// port will be implicitly owned by the connector.
 
    pub(crate) fn create_channel(&mut self) -> Channel {
 
        let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst);
 
        let putter_id = PortIdLocal::new(getter_id + 1);
 
        let getter_id = PortIdLocal::new(getter_id);
 

	
 
        self.ports.push(Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            kind: PortKind::Getter,
 
            peer_connector: self.id,
 
        });
 

	
 
        self.ports.push(Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            kind: PortKind::Putter,
 
            peer_connector: self.id,
 
        });
 

	
 
        return Channel{ getter_id, putter_id };
 
    }
 

	
 
    pub(crate) fn add_port(&mut self, port: Port) {
 
        debug_assert!(!self.ports.iter().any(|v| v.self_id == port.self_id));
 
        self.ports.push(port);
 
    }
 

	
 
    pub(crate) fn remove_port(&mut self, id: PortIdLocal) -> Port {
 
        let index = self.port_id_to_index(id);
 
        return self.ports.remove(index);
 
    }
 

	
 
    pub(crate) fn get_port(&self, id: PortIdLocal) -> &Port {
 
        let index = self.port_id_to_index(id);
 
        return &self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_mut(&mut self, id: PortIdLocal) -> &mut Port {
 
        let index = self.port_id_to_index(id);
 
        return &mut self.ports[index];
 
    }
 

	
 
    fn port_id_to_index(&self, id: PortIdLocal) -> usize {
 
        for (idx, port) in self.ports.iter().enumerate() {
 
            if port.self_id == id {
 
                return idx;
 
            }
 
        }
 

	
 
        panic!("port {:?}, not owned by connector", id);
 
    }
 
}
 

	
 
pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
}
 

	
 
// Thinking aloud: actual ports should be accessible by connector, but managed
 
// by the scheduler (to handle rerouting messages). We could just give a read-
 
// only context, instead of an extra call on the "Connector" trait.
 

	
 
impl Scheduler {
 
    pub fn new(runtime: Arc<RuntimeInner>) -> Self {
 
        return Self{ runtime };
 
    }
 

	
 
    pub fn run(&mut self) {
 
        // Setup global storage and workspaces that are reused for every
 
        // connector that we run
 
        let mut delta_state = RunDeltaState::new();
 

	
 
        'thread_loop: loop {
 
            // Retrieve a unit of work
 
            let connector_key = self.runtime.global_store.connector_queue.pop_front();
 
            if connector_key.is_none() {
 
                // TODO: @Performance, needs condition or something, and most
 
                //  def' not sleeping
 
                thread::sleep(Duration::new(1, 0));
 
                if self.runtime.global_store.should_exit.load(Ordering::Acquire) {
 
                    // Thread exits!
 
                    break 'thread_loop;
 
                }
 

	
 
                continue 'thread_loop;
 
            }
 

	
 
            // We have something to do
 
            let connector_key = connector_key.unwrap();
 
            let scheduled = self.runtime.global_store.connectors.get_mut(&connector_key);
 

	
 
            // Keep running until we should no longer immediately schedule the
 
            // connector.
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 
            while cur_schedule == ConnectorScheduling::Immediate {
 
                // Check all the message that are in the shared inbox
 
                while let Some(message) = scheduled.public.inbox.take_message() {
 
                    match message.contents {
 
                        MessageContents::Data(content) => {
 
                            // Check if we need to reroute, or can just put it
 
                            // in the private inbox of the connector
 
                            if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, content.sending_port) {
 
                                self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(content));
 
                            } else {
 
                                scheduled.connector.insert_data_message(content);
 
                            }
 
                        }
 
                        MessageContents::Sync(content) => {
 
                            scheduled.connector.insert_sync_message(content, &scheduled.context, &mut delta_state);
 
                        }
 
                        MessageContents::Solution(content) => {
 
                            // TODO: Handle solution message
 
                        },
 
                        MessageContents::Control(content) => {
 
                            match content.content {
 
                                ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
 
                                    // Need to change port target
 
                                    let port = scheduled.context.get_port_mut(port_id);
 
                                    port.peer_connector = new_target_connector_id;
 
                                    debug_assert!(delta_state.outbox.is_empty());
 

	
 
                                    // And respond with an Ack
 
                                    // Note: after this code has been reached, we may not have any
 
                                    // messages in the outbox that send to the port whose owning
 
                                    // connector we just changed. This is because the `ack` will
 
                                    // clear the rerouting entry of the `ack`-receiver.
 
                                    self.send_message_and_wake_up_if_sleeping(
 
                                        content.sender,
 
                                        Message{
 
                                            sending_connector: connector_key.downcast(),
 
                                            receiving_port: PortIdLocal::new_invalid(),
 
                                            contents: MessageContents::Control(ControlMessage{
 
                                                id: content.id,
 
                                                content: ControlMessageVariant::Ack,
 
                                            }),
 
                                        }
 
                                    );
 
                                },
 
                                ControlMessageVariant::Ack => {
 
                                    scheduled.router.handle_ack(content.id);
 
                                }
 
                    // Check for rerouting
 
                    if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) {
 
                        self.send_message_and_wake_up_if_sleeping(other_connector_id, message);
 
                        continue;
 
                    }
 

	
 
                    // Check for messages that requires special action from the
 
                    // scheduler.
 
                    if let MessageContents::Control(content) = message.contents {
 
                        match content.content {
 
                            ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
 
                                // Need to change port target
 
                                let port = scheduled.context.get_port_mut(port_id);
 
                                port.peer_connector = new_target_connector_id;
 
                                debug_assert!(delta_state.outbox.is_empty());
 

	
 
                                // And respond with an Ack
 
                                // Note: after this code has been reached, we may not have any
 
                                // messages in the outbox that send to the port whose owning
 
                                // connector we just changed. This is because the `ack` will
 
                                // clear the rerouting entry of the `ack`-receiver.
 
                                self.send_message_and_wake_up_if_sleeping(
 
                                    message.sending_connector,
 
                                    Message{
 
                                        sending_connector: connector_key.downcast(),
 
                                        receiving_port: PortIdLocal::new_invalid(),
 
                                        contents: MessageContents::Control(ControlMessage{
 
                                            id: content.id,
 
                                            content: ControlMessageVariant::Ack,
 
                                        }),
 
                                    }
 
                                );
 
                            },
 
                            ControlMessageVariant::Ack => {
 
                                scheduled.router.handle_ack(content.id);
 
                            }
 
                        }
 
                        Message::Ping => {},
 
                    } else {
 
                        // Let connector handle message
 
                        scheduled.connector.handle_message(message.contents, &scheduled.context, &mut delta_state);
 
                    }
 
                }
 

	
 
                // Actually run the connector
 
                let new_schedule = scheduled.connector.run(
 
                    &self.runtime.protocol_description, &scheduled.context, &mut delta_state
 
                );
 

	
 
                // Handle all of the output from the current run: messages to
 
                // send and connectors to instantiate.
 
                self.handle_delta_state(&connector_key, &mut scheduled.context, &mut delta_state);
 

	
 
                cur_schedule = new_schedule;
 
            }
 

	
 
            // If here then the connector does not require immediate execution.
 
            // So enqueue it if requested, and otherwise put it in a sleeping
 
            // state.
 
            match cur_schedule {
 
                ConnectorScheduling::Immediate => unreachable!(),
 
                ConnectorScheduling::Later => {
 
                    // Simply queue it again later
 
                    self.runtime.global_store.connector_queue.push_back(connector_key);
 
                },
 
                ConnectorScheduling::NotNow => {
 
                    // Need to sleep, note that we are the only ones which are
 
                    // allows to set the sleeping state to `true`, and since
 
                    // we're running it must currently be `false`.
 
                    debug_assert_eq!(scheduled.public.sleeping.load(Ordering::Acquire), false);
 
                    scheduled.public.sleeping.store(true, Ordering::Release);
 

	
 
                    // We might have received a message in the meantime from a
 
                    // thread that did not see the sleeping flag set to `true`,
 
                    // so:
 
                    if !scheduled.public.inbox.is_empty() {
 
                        let should_reschedule_self = scheduled.public.sleeping
 
                            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
                            .is_ok();
 

	
 
                        if should_reschedule_self {
 
                            self.runtime.global_store.connector_queue.push_back(connector_key);
 
                        }
 
                    }
 
                }
 
            }
 
        }
 
    }
 

	
 
    fn handle_delta_state(&mut self, connector_key: &ConnectorKey, context: &mut ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        // Handling any messages that were sent
 
        let connector_id = connector_key.downcast();
 

	
 
        if !delta_state.outbox.is_empty() {
 
            for mut message in delta_state.outbox.drain(..) {
 
                // Based on the message contents, decide where the message
 
                // should be sent to. This might end up modifying the message.
 
                let (peer_connector, peer_port) = match &mut message {
 
                    MessageContents::Data(contents) => {
 
                        let port = context.get_port(contents.sending_port);
 
                        (port.peer_connector, port.peer_id)
 
                    },
 
                    MessageContents::Sync(contents) => {
 
                        let connector = contents.to_visit.pop().unwrap();
 
                        (connector, PortIdLocal::new_invalid())
 
                    },
 
                    MessageContents::RequestCommit(contents)=> {
 
                        let connector = contents.to_visit.pop().unwrap();
 
                        (connector, PortIdLocal::new_invalid())
 
                    },
 
                    MessageContents::ConfirmCommit(contents) => {
 
                        for to_visit in &contents.to_visit {
 
                            let message = Message{
 
                                sending_connector: connector_id,
 
                                receiving_port: PortIdLocal::new_invalid(),
 
                                contents: contents.clone(),
 
                                contents: MessageContents::ConfirmCommit(contents.clone()),
 
                            };
 
                            self.send_message_and_wake_up_if_sleeping(*to_visit, message);
 
                        }
 
                        (ConnectorId::new_invalid(), PortIdLocal::new_invalid())
 
                    },
 
                    MessageContents::Control(_) | MessageContents::Ping => {
 
                        // Never generated by the user's code
 
                        unreachable!();
 
                    }
 
                };
 

	
 
                // TODO: Maybe clean this up, perhaps special case for
 
                //  ConfirmCommit can be handled differently.
 
                if peer_connector.is_valid() {
 
                    let message = Message {
 
                        sending_connector: connector_id,
 
                        receiving_port: peer_port,
 
                        contents: message,
 
                    };
 
                    self.send_message_and_wake_up_if_sleeping(peer_connector, message);
 
                }
 
            }
 
        }
 

	
 
        if !delta_state.new_ports.is_empty() {
 
            for port in delta_state.new_ports.drain(..) {
 
                context.ports.push(port);
 
            }
 
        }
 

	
 
        // Handling any new connectors that were scheduled
 
        // TODO: Pool outgoing messages to reduce atomic access
 
        if !delta_state.new_connectors.is_empty() {
 
            let cur_connector = self.runtime.global_store.connectors.get_mut(connector_key);
 

	
 
            for new_connector in delta_state.new_connectors.drain(..) {
 
                // Add to global registry to obtain key
 
                let new_key = self.runtime.global_store.connectors.create(cur_connector, ConnectorVariant::UserDefined(new_connector));
 
                let new_key = self.runtime.global_store.connectors.create_pdl(cur_connector, new_connector);
 
                let new_connector = self.runtime.global_store.connectors.get_mut(&new_key);
 

	
 
                // Call above changed ownership of ports, but we still have to
 
                // let the other end of the channel know that the port has
 
                // changed location.
 
                for port in &new_connector.context.ports {
 
                    let reroute_message = cur_connector.router.prepare_reroute(
 
                        port.self_id, port.peer_id, cur_connector.context.id,
 
                        port.peer_connector, new_connector.context.id
 
                    );
 

	
 
                    self.send_message_and_wake_up_if_sleeping(peer_connector_id, reroute_message);
 
                    self.send_message_and_wake_up_if_sleeping(port.peer_connector, reroute_message);
 
                }
 

	
 
                // Schedule new connector to run
 
                self.runtime.global_store.connector_queue.push_back(new_key);
 
            }
 
        }
 
    }
 

	
 
    pub fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) {
 
    fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) {
 
        let connector = self.runtime.global_store.connectors.get_shared(connector_id);
 

	
 
        connector.inbox.insert_message(message);
 
        let should_wake_up = connector.sleeping
 
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_wake_up {
 
            let key = unsafe { ConnectorKey::from_id(connector_id) };
 
            self.runtime.global_store.connector_queue.push_back(key);
 
        }
 
    }
 
}
 

	
 
/// Represents a rerouting entry due to a moved port
 
// TODO: Optimize
 
struct ReroutedTraffic {
 
    id: u32,                        // ID of control message
 
    port: PortIdLocal,              // targeted port
 
    target_port: PortIdLocal,       // targeted port
 
    source_connector: ConnectorId,  // connector we expect messages from
 
    target_connector: ConnectorId,  // connector they should be rerouted to
 
}
 

	
 
pub(crate) struct Router {
 
    id_counter: u32,
 
    active: Vec<ReroutedTraffic>,
 
}
 

	
 
impl Router {
 
    pub fn new() -> Self {
 
        Router{
 
            id_counter: 0,
 
            active: Vec::new(),
 
        }
 
    }
 

	
 
    /// Prepares rerouting messages due to changed ownership of a port. The
 
    /// control message returned by this function must be sent to the
 
    /// transferred port's peer connector.
 
    pub fn prepare_reroute(
 
        &mut self,
 
        port_id: PortIdLocal, peer_port_id: PortIdLocal,
 
        self_connector_id: ConnectorId, peer_connector_id: ConnectorId,
 
        new_owner_connector_id: ConnectorId
 
    ) -> Message {
 
        let id = self.id_counter;
 
        self.id_counter.overflowing_add(1);
 

	
 
        self.active.push(ReroutedTraffic{
 
            id,
 
            port: port_id,
 
            target_port: port_id,
 
            source_connector: peer_connector_id,
 
            target_connector: new_owner_connector_id,
 
        });
 

	
 
        return Message::Control(ControlMessage{
 
            id,
 
            content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id)
 
        });
 
        return Message{
 
            sending_connector: self_connector_id,
 
            receiving_port: PortIdLocal::new_invalid(),
 
            contents: MessageContents::Control(ControlMessage{
 
                id,
 
                content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id),
 
            })
 
        };
 
    }
 

	
 
    /// Returns true if the supplied message should be rerouted. If so then this
 
    /// function returns the connector that should retrieve this message.
 
    pub fn should_reroute(&self, sending_connector: ConnectorId, sending_port: PortIdLocal) -> Option<ConnectorId> {
 
    pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option<ConnectorId> {
 
        for reroute in &self.active {
 
            if reroute.source_connector == sending_connector &&
 
                reroute.port == sending_port {
 
                reroute.target_port == target_port {
 
                // Need to reroute this message
 
                return Some(reroute.target_connector);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Handles an Ack as an answer to a previously sent control message
 
    pub fn handle_ack(&mut self, id: u32) {
 
        let index = self.active.iter()
 
            .position(|v| v.id == id);
 

	
 
        match index {
 
            Some(index) => { self.active.remove(index); },
 
            None => { todo!("handling of nefarious ACKs"); },
 
        }
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)