Changeset - cc8030b35903
[Not reviewed]
1 5 0
MH - 4 years ago 2021-09-07 17:41:31
contact@maxhenger.nl
more WIP on runtime revisiting
6 files changed with 597 insertions and 55 deletions:
0 comments (0 inline, 0 general)
src/common.rs
Show inline comments
 
@@ -49,13 +49,13 @@ pub struct U32Stream {
 
#[derive(Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize)]
 
pub struct ComponentId(Id); // PUB because it can be returned by errors
 

	
 
/// Identifier of a port in a session
 
#[derive(Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize)]
 
#[repr(transparent)]
 
pub struct PortId(Id);
 
pub struct PortId(pub(crate) Id);
 

	
 
/// A safely aliasable heap-allocated payload of message bytes
 
#[derive(Default, Eq, PartialEq, Clone, Ord, PartialOrd)]
 
pub struct Payload(pub Arc<Vec<u8>>);
 
#[derive(Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd)]
 

	
src/protocol/eval/executor.rs
Show inline comments
 
@@ -196,15 +196,16 @@ pub enum EvalContinuation {
 
    Stepping,
 
    Inconsistent,
 
    Terminal,
 
    SyncBlockStart,
 
    SyncBlockEnd,
 
    NewComponent(DefinitionId, i32, ValueGroup),
 
    BlockFires(Value),
 
    BlockGet(Value),
 
    Put(Value, Value),
 
    NewChannel,
 
    BlockFires(PortId),
 
    BlockGet(PortId),
 
    Put(PortId, Value),
 
}
 

	
 
// Note: cloning is fine, methinks. cloning all values and the heap regions then
 
// we end up with valid "pointers" to heap regions.
 
#[derive(Debug, Clone)]
 
pub struct Prompt {
 
@@ -227,13 +228,20 @@ impl Prompt {
 
        args.into_store(&mut prompt.store);
 
        prompt.store.reserve_stack(max_stack_size);
 

	
 
        prompt
 
    }
 

	
 
    pub(crate) fn step(&mut self, types: &TypeTable, heap: &Heap, modules: &[Module], ctx: &mut EvalContext) -> EvalResult {
 
    /// Big 'ol function right here. Didn't want to break it up unnecessarily.
 
    /// It consists of, in sequence: executing any expressions that should be
 
    /// executed before the next statement can be evaluated, then a section that
 
    /// performs debug printing, and finally a section that takes the next
 
    /// statement and executes it. If the statement requires any expressions to
 
    /// be evaluated, then they will be added such that the next time `step` is
 
    /// called, all of these expressions are indeed evaluated.
 
    pub(crate) fn step(&mut self, types: &TypeTable, heap: &Heap, modules: &[Module], ctx: &mut impl RunContext) -> EvalResult {
 
        // Helper function to transfer multiple values from the expression value
 
        // array into a heap region (e.g. constructing arrays or structs).
 
        fn transfer_expression_values_front_into_heap(cur_frame: &mut Frame, store: &mut Store, num_values: usize) -> HeapPos {
 
            let heap_pos = store.alloc_heap();
 

	
 
            // Do the transformation first (because Rust...)
 
@@ -567,26 +575,42 @@ impl Prompt {
 
                            // fancy shenanigans at all, just push the result.
 
                            match expr.method {
 
                                Method::Get => {
 
                                    let value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let value = self.store.maybe_read_ref(&value).clone();
 

	
 
                                    match ctx.get(value.clone(), &mut self.store) {
 
                                    let port_id = if let Value::Input(port_id) = value {
 
                                        port_id
 
                                    } else {
 
                                        unreachable!("executor calling 'get' on value {:?}", value)
 
                                    };
 

	
 
                                    match ctx.get(port_id, &mut self.store) {
 
                                        Some(result) => {
 
                                            // We have the result
 
                                            cur_frame.expr_values.push_back(result)
 
                                        },
 
                                        None => {
 
                                            // Don't have the result yet, prepare the expression to
 
                                            // get run again after we've received a message.
 
                                            cur_frame.expr_values.push_front(value.clone());
 
                                            cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id));
 
                                            return Ok(EvalContinuation::BlockGet(value));
 
                                            return Ok(EvalContinuation::BlockGet(port_id));
 
                                        }
 
                                    }
 
                                },
 
                                Method::Put => {
 
                                    let port_value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let deref_port_value = self.store.maybe_read_ref(&port_value).clone();
 

	
 
                                    let port_id = if let Value::Output(port_id) = deref_port_value {
 
                                        port_id
 
                                    } else {
 
                                        unreachable!("executor calling 'put' on value {:?}", deref_port_value)
 
                                    };
 

	
 
                                    let msg_value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let deref_msg_value = self.store.maybe_read_ref(&msg_value).clone();
 

	
 
                                    match deref_msg_value {
 
                                        Value::Message(_) => {},
 
                                        _ => {
 
@@ -594,31 +618,38 @@ impl Prompt {
 
                                                self, modules, heap, expr_id,
 
                                                String::from("Calls to `put` are currently restricted to only send instances of `msg` types. This will change in the future")
 
                                            ));
 
                                        }
 
                                    }
 

	
 
                                    if ctx.did_put(deref_port_value.clone()) {
 
                                    if ctx.did_put(port_id) {
 
                                        // We're fine, deallocate in case the expression value stack
 
                                        // held an owned value
 
                                        self.store.drop_value(msg_value.get_heap_pos());
 
                                    } else {
 
                                        cur_frame.expr_values.push_front(msg_value);
 
                                        cur_frame.expr_values.push_front(port_value);
 
                                        cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id));
 
                                        return Ok(EvalContinuation::Put(deref_port_value, deref_msg_value));
 
                                        return Ok(EvalContinuation::Put(port_id, deref_msg_value));
 
                                    }
 
                                },
 
                                Method::Fires => {
 
                                    let port_value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let port_value_deref = self.store.maybe_read_ref(&port_value).clone();
 
                                    match ctx.fires(port_value_deref.clone()) {
 

	
 
                                    let port_id = match port_value_deref {
 
                                        Value::Input(port_id) => port_id,
 
                                        Value::Output(port_id) => port_id,
 
                                        _ => unreachable!("executor calling 'fires' on value {:?}", value),
 
                                    };
 

	
 
                                    match ctx.fires(port_id) {
 
                                        None => {
 
                                            cur_frame.expr_values.push_front(port_value);
 
                                            cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id));
 
                                            return Ok(EvalContinuation::BlockFires(port_value_deref));
 
                                            return Ok(EvalContinuation::BlockFires(port_id));
 
                                        },
 
                                        Some(value) => {
 
                                            cur_frame.expr_values.push_back(value);
 
                                        }
 
                                    }
 
                                },
 
@@ -762,23 +793,31 @@ impl Prompt {
 
                match stmt {
 
                    LocalStatement::Memory(stmt) => {
 
                        let variable = &heap[stmt.variable];
 
                        self.store.write(ValueId::Stack(variable.unique_id_in_scope as u32), Value::Unassigned);
 

	
 
                        cur_frame.position = stmt.next;
 
                        Ok(EvalContinuation::Stepping)
 
                    },
 
                    LocalStatement::Channel(stmt) => {
 
                        let [from_value, to_value] = ctx.new_channel();
 
                        self.store.write(ValueId::Stack(heap[stmt.from].unique_id_in_scope as u32), from_value);
 
                        self.store.write(ValueId::Stack(heap[stmt.to].unique_id_in_scope as u32), to_value);
 

	
 
                        // Need to create a new channel by requesting it from
 
                        // the runtime.
 
                        match ctx.get_channel() {
 
                            None => {
 
                                // No channel is pending. So request one
 
                                Ok(EvalContinuation::NewChannel)
 
                            },
 
                            Some((put_port, get_port)) => {
 
                                self.store.write(ValueId::Stack(heap[stmt.from].unique_id_in_scope as u32), put_port);
 
                                self.store.write(ValueId::Stack(heap[stmt.to].unique_id_in_scope as u32), get_port);
 
                                cur_frame.position = stmt.next;
 
                                Ok(EvalContinuation::Stepping)
 
                            }
 
                        }
 
                    }
 
                }
 

	
 
                Ok(EvalContinuation::Stepping)
 
            },
 
            Statement::Labeled(stmt) => {
 
                cur_frame.position = stmt.body;
 

	
 
                Ok(EvalContinuation::Stepping)
 
            },
src/protocol/mod.rs
Show inline comments
 
@@ -23,20 +23,20 @@ pub struct Module {
 
    pub(crate) root_id: RootId,
 
    pub(crate) name: Option<StringRef<'static>>,
 
}
 
/// Description of a protocol object, used to configure new connectors.
 
#[repr(C)]
 
pub struct ProtocolDescription {
 
    modules: Vec<Module>,
 
    heap: Heap,
 
    types: TypeTable,
 
    pool: Mutex<StringPool>,
 
    pub(crate) modules: Vec<Module>,
 
    pub(crate) heap: Heap,
 
    pub(crate) types: TypeTable,
 
    pub(crate) pool: Mutex<StringPool>,
 
}
 
#[derive(Debug, Clone)]
 
pub(crate) struct ComponentState {
 
    prompt: Prompt,
 
    pub(crate) prompt: Prompt,
 
}
 

	
 
#[allow(dead_code)]
 
pub(crate) enum EvalContext<'a> {
 
    Nonsync(&'a mut NonsyncProtoContext<'a>),
 
    Sync(&'a mut SyncProtoContext<'a>),
 
@@ -140,12 +140,13 @@ impl ProtocolDescription {
 
            }
 
        }
 

	
 
        let module_root = self.lookup_module_root(module_name).unwrap();
 
        let root = &self.heap[module_root];
 
        let def = root.get_definition_ident(&self.heap, identifier).unwrap();
 
        // TODO: Check for polymorph
 
        ComponentState { prompt: Prompt::new(&self.types, &self.heap, def, 0, ValueGroup::new_stack(args)) }
 
    }
 

	
 
    fn lookup_module_root(&self, module_name: &[u8]) -> Option<RootId> {
 
        for module in self.modules.iter() {
 
            match &module.name {
 
@@ -158,12 +159,73 @@ impl ProtocolDescription {
 
            }
 
        }
 

	
 
        return None;
 
    }
 
}
 

	
 
// TODO: @temp Should just become a concrete thing that is passed in
 
pub trait RunContext {
 
    fn did_put(&self, port: PortId) -> bool;
 
    fn get(&self, port: PortId) -> Option<Value>; // None if still waiting on message
 
    fn fires(&self, port: PortId) -> Option<Value>; // None if not yet branched
 
    fn get_channel(&self) -> Option<(Value, Value)>; // None if not yet prepared
 
}
 

	
 
#[derive(Debug)]
 
pub enum RunResult {
 
    // Can only occur outside sync blocks
 
    ComponentTerminated, // component has exited its procedure
 
    ComponentAtSyncStart,
 
    NewComponent(DefinitionId, i32, ValueGroup), // should also be possible inside sync
 
    NewChannel, // should also be possible inside sync
 
    // Can only occur inside sync blocks
 
    BranchInconsistent, // branch has inconsistent behaviour
 
    BranchMissingPortState(PortId), // branch doesn't know about port firing
 
    BranchMissingPortValue(PortId), // branch hasn't received message on input port yet
 
    BranchAtSyncEnd,
 
    BranchPut(PortId, ValueGroup),
 
}
 

	
 
impl ComponentState {
 
    pub(crate) fn run(&mut self, ctx: &mut impl RunContext, pd: &ProtocolDescription) -> RunResult {
 
        use EvalContinuation as EC;
 
        use RunResult as RR;
 

	
 
        loop {
 
            let step_result = self.prompt.step(&pd.types, &pd.heap, &pd.modules, ctx);
 
            match step_result {
 
                Err(reason) => {
 
                    // TODO: @temp
 
                    println!("Evaluation error:\n{}", reason);
 
                    todo!("proper error handling/bubbling up");
 
                },
 
                Ok(continuation) => match continuation {
 
                    // TODO: Probably want to remove this translation
 
                    EC::Stepping => continue,
 
                    EC::Inconsistent => return RR::BranchInconsistent,
 
                    EC::Terminal => return RR::ComponentTerminated,
 
                    EC::SyncBlockStart => return RR::ComponentAtSyncStart,
 
                    EC::SyncBlockEnd => return RR::BranchAtSyncEnd,
 
                    EC::NewComponent(definition_id, monomorph_idx, args) =>
 
                        return RR::NewComponent(definition_id, monomorph_idx, arg),
 
                    EC::NewChannel =>
 
                        return RR::NewChannel,
 
                    EC::BlockFires(port_id) => return RR::BranchMissingPortState(port_id),
 
                    EC::BlockGet(port_id) => return RR::BranchMissingPortValue(port_id),
 
                    EC::Put(port_id, value) => {
 
                        let value_group = ValueGroup::from_store(&self.prompt.store, &[value]);
 
                        return RR::BranchPut(port_id, value_group);
 
                    },
 
                }
 
            }
 
        }
 
    }
 
}
 

	
 
// TODO: @remove the old stuff
 
impl ComponentState {
 
    pub(crate) fn nonsync_run<'a: 'b, 'b>(
 
        &'a mut self,
 
        context: &'b mut NonsyncProtoContext<'b>,
 
        pd: &'a ProtocolDescription,
 
    ) -> NonsyncBlocker {
 
@@ -258,22 +320,12 @@ impl ComponentState {
 
                        Value::Input(port) => {
 
                            return SyncBlocker::CouldntReadMsg(port);
 
                        }
 
                        _ => unreachable!(),
 
                    },
 
                    EvalContinuation::Put(port, message) => {
 
                        let value;
 
                        match port {
 
                            Value::Output(port_value) => {
 
                                value = port_value;
 
                            }
 
                            Value::Input(port_value) => {
 
                                value = port_value;
 
                            }
 
                            _ => unreachable!(),
 
                        }
 
                        let payload;
 
                        match message {
 
                            Value::Null => {
 
                                return SyncBlocker::Inconsistent;
 
                            },
 
                            Value::Message(heap_pos) => {
 
@@ -284,13 +336,13 @@ impl ComponentState {
 
                                    bytes.push(value.as_uint8());
 
                                }
 
                                payload = Payload(Arc::new(bytes));
 
                            }
 
                            _ => unreachable!(),
 
                        }
 
                        return SyncBlocker::PutMsg(value, payload);
 
                        return SyncBlocker::PutMsg(port, payload);
 
                    }
 
                },
 
            }
 
        }
 
    }
 
}
src/runtime2/mod.rs
Show inline comments
 
mod runtime;
 
\ No newline at end of file
 
mod registry;
 
\ No newline at end of file
src/runtime2/registry.rs
Show inline comments
 
deleted file
src/runtime2/runtime.rs
Show inline comments
 
use std::sync::Arc;
 
use std::collections::{HashMap, VecDeque};
 
use std::collections::hash_map::{Entry};
 

	
 
use crate::runtime::error as old_error;
 

	
 
use crate::Polarity;
 
use crate::{Polarity, PortId};
 
use crate::common::Id;
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 

	
 
use super::registry::Registry;
 

	
 
enum AddComponentError {
 
    ModuleDoesNotExist,
 
    ConnectorDoesNotExist,
 
    InvalidArgumentType(usize), // value is index of (first) invalid argument
 
}
 

	
 
struct PortDesc {
 
    id: u32,
 
    peer_id: u32,
 
    owning_connector_id: Option<u32>,
 
    is_getter: bool, // otherwise one can only call `put`
 
}
 

	
 
// Message received from some kind of peer
 
struct BufferedMessage {
 
    // If in inbox, then sender is the connector's peer. If in the outbox, then
 
    // the sender is the connector itself.
 
    sending_port: PortId,
 
    receiving_port: PortId,
 
    peer_prev_branch_id: Option<u32>, // of the sender
 
    peer_cur_branch_id: u32, // of the sender
 
    message: ValueGroup,
 
}
 

	
 
struct ConnectorDesc {
 
    id: u32,
 
    in_sync: bool,
 
    branches: Vec<BranchDesc>, // first one is always non-speculative one
 
    branch_id_counter: u32,
 
    spec_branches_active: VecDeque<u32>, // branches that can be run immediately
 
    spec_branches_pending_receive: HashMap<PortId, u32>, // from port_id to branch index
 
    global_inbox: HashMap<(PortId, u32), BufferedMessage>,
 
    global_outbox: HashMap<(PortId, u32), BufferedMessage>,
 
}
 

	
 
impl ConnectorDesc {
 
    /// Creates a new connector description. Implicit assumption is that there
 
    /// is one (non-sync) branch that can be immediately executed.
 
    fn new(id: u32, component_state: ComponentState, owned_ports: Vec<u32>) -> Self {
 
        let mut branches_active = VecDeque::new();
 
        branches_active.push_back(0);
 

	
 
        Self{
 
            id,
 
            in_sync: false,
 
            branches: vec![BranchDesc::new_non_sync(component_state, owned_ports)],
 
            branch_id_counter: 1,
 
            spec_branches_active: branches_active,
 
            spec_branches_pending_receive: HashMap::new(),
 
            global_inbox: HashMap::new(),
 
            global_outbox: HashMap::new(),
 
        }
 
    }
 
}
 

	
 
enum BranchState {
 
    RunningNonSync, // regular running non-speculative branch
 
    RunningSync, // regular running speculative branch
 
    BranchPoint, // branch which ended up being a branching point
 
    ReachedEndSync, // branch that successfully reached the end-sync point, is a possible local solution
 
    Failed, // branch that became inconsistent
 
}
 

	
 
struct BranchPortDesc {
 
    last_registered_identifier: Option<u32>, // if putter, then last sent branch ID, if getter, then last received branch ID
 
    num_times_fired: u32, // number of puts/gets on this port
 
}
 

	
 
struct BranchDesc {
 
    index: u32,
 
    parent_index: Option<u32>,
 
    identifier: u32,
 
    code_state: ComponentState,
 
    branch_state: BranchState,
 
    owned_ports: Vec<u32>,
 
    message_inbox: HashMap<(PortId, u32), ValueGroup>, // from (port id, 1-based recv index) to received value
 
    port_mapping: HashMap<PortId, BranchPortDesc>,
 
}
 

	
 
impl BranchDesc {
 
    /// Creates the first non-sync branch of a connector
 
    fn new_non_sync(component_state: ComponentState, owned_ports: Vec<u32>) -> Self {
 
        Self{
 
            index: 0,
 
            parent_index: None,
 
            identifier: 0,
 
            code_state: component_state,
 
            branch_state: BranchState::RunningNonSync,
 
            owned_ports,
 
            message_inbox: HashMap::new(),
 
            port_mapping: HashMap::new(),
 
        }
 
    }
 

	
 
    /// Creates a sync branch based on the supplied branch. This supplied branch
 
    /// is the branching point for the new one, i.e. the parent in the branching
 
    /// tree.
 
    fn new_sync_from(index: u32, identifier: u32, branch_state: &BranchDesc) -> Self {
 
        Self{
 
            index,
 
            parent_index: Some(branch_state.index),
 
            identifier,
 
            code_state: branch_state.code_state.clone(),
 
            branch_state: BranchState::RunningSync,
 
            owned_ports: branch_state.owned_ports.clone(),
 
            message_inbox: branch_state.message_inbox.clone(),
 
            port_mapping: branch_state.port_mapping.clone(),
 
        }
 
    }
 
}
 

	
 
// Separate from Runtime for borrowing reasons
 
struct Registry {
 
    ports: HashMap<u32, PortDesc>,
 
    port_counter: u32,
 
    connectors: HashMap<u32, ConnectorDesc>,
 
    connector_counter: u32,
 
}
 

	
 
impl Registry {
 
    fn new() -> Self {
 
        Self{
 
            ports: HashMap::new(),
 
            port_counter: 0,
 
            connectors: HashMap::new(),
 
            connector_counter: 0,
 
        }
 
    }
 

	
 
    /// Returns (putter_port, getter_port)
 
    pub fn add_channel(&mut self, owning_connector_id: Option<u32>) -> (u32, u32) {
 
        let get_id = self.generate_port_id();
 
        let put_id = self.generate_port_id();
 

	
 
        self.ports.insert(get_id, PortDesc{
 
            id: get_id,
 
            peer_id: put_id,
 
            owning_connector_id,
 
            is_getter: true,
 
        });
 
        self.ports.insert(put_id, PortDesc{
 
            id: put_id,
 
            peer_id: get_id,
 
            owning_connector_id,
 
            is_getter: false,
 
        });
 

	
 
        return (put_id, get_id);
 
    }
 

	
 
    fn generate_port_id(&mut self) -> u32 {
 
        let id = self.port_counter;
 
        self.port_counter += 1;
 
        return id;
 
    }
 
}
 

	
 
// TODO: @performance, use freelists+ids instead of HashMaps
 
struct Runtime {
 
    protocol: Arc<ProtocolDescription>,
 

	
 
    registry: Registry,
 
    connectors_active: VecDeque<u32>,
 
}
 

	
 
impl Runtime {
 
    pub fn new(pd: Arc<ProtocolDescription>) -> Self {
 
        Self{ protocol: pd }
 
        Self{
 
            protocol: pd,
 
            registry: Registry::new(),
 
            connectors_active: VecDeque::new(),
 
        }
 
    }
 

	
 
    /// Creates a new channel that is not owned by any connector and returns its
 
    /// endpoints. The returned values are of the (putter port, getter port)
 
    /// respectively.
 
    pub fn add_channel(&mut self) -> (Value, Value) {
 
        let (put_id, get_id) = self.registry.add_channel(None);
 
        return (
 
            port_value_from_id(None, put_id, true),
 
            port_value_from_id(None, get_id, false)
 
        );
 
    }
 

	
 
    pub fn add_component(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), AddComponentError> {
 
        use AddComponentError as ACE;
 
        use old_error::AddComponentError as OldACE;
 
        use crate::runtime::error::AddComponentError as OldACE;
 

	
 
        // TODO: Allow the ValueGroup to contain any kind of value
 
        // TODO: Remove the responsibility of adding a component from the PD
 

	
 
        // Lookup module and the component
 
        // TODO: Remove this error enum translation
 
        // TODO: Remove this error enum translation. Note that for now this
 
        //  function forces port-only arguments
 
        let port_polarities = match self.protocol.component_polarities(module.as_bytes(), procedure.as_bytes()) {
 
            Ok(polarities) => polarities,
 
            Err(reason) => match reason {
 
                OldACE::NonPortTypeParameters => return Err(ACE::InvalidArgumentType(0)),
 
                OldACE::NoSuchModule => return Err(ACE::ModuleDoesNotExist),
 
                OldACE::NoSuchComponent => return Err(ACE::ModuleDoesNotExist),
 
                _ => unreachable!(),
 
            }
 
        };
 

	
 
        // Make sure supplied values (and types) are correct
 
        let mut ports = Vec::with_capacity(values.values.len());
 
        
 
        for (value_idx, value) in values.values.iter().enumerate() {
 
            let polarity = &port_polarities[value_idx];
 

	
 
            match value {
 
                Value::Input(port_id) => {
 
                    if *polarity != Polarity::Getter {
 
@@ -65,9 +236,304 @@ impl Runtime {
 
                },
 
                _ => return Err(ACE::InvalidArgumentType(value_idx))
 
            }
 
        }
 

	
 
        // Instantiate the component
 
        let component_id = self.generate_connector_id();
 
        let component_state = self.protocol.new_component(module.as_bytes(), procedure.as_bytes(), &ports);
 
        let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect();
 

	
 
        self.registry.connectors.insert(component_id, ConnectorDesc::new(component_id, component_state, ports));
 
        self.connectors_active.push_back(component_id);
 

	
 
        Ok(())
 
    }
 

	
 
    pub fn run(&mut self) {
 
        // Go through all active connectors
 
        while !self.connectors_active.is_empty() {
 
            let next_id = self.connectors_active.pop_front().unwrap();
 
            self.run_connector(next_id);
 
        }
 
    }
 

	
 
    /// Runs a connector for as long as sensible, then returns `true` if the
 
    /// connector should be run again in the future, and return `false` if the
 
    /// connector has terminated. Note that a terminated connector still 
 
    /// requires cleanup.
 
    pub fn run_connector(&mut self, id: u32) -> bool {
 
        let desc = self.registry.connectors.get_mut(&id).unwrap();
 
        let mut run_context = Context{
 
            connector_id: id,
 
            branch_id: None,
 
            pending_channel: None,
 
        };
 

	
 
        let mut call_again = false;
 

	
 
        while call_again {
 
            call_again = false; // bit of a silly pattern, maybe revise
 

	
 
            if desc.in_sync {
 
                // Running in synchronous mode, so run all branches until their
 
                // blocking point
 
                debug_assert!(!desc.spec_branches_active.is_empty());
 
                let branch_index = desc.spec_branches_active.pop_front().unwrap();
 

	
 
                let branch = &mut desc.branches[branch_index as usize];
 
                let run_result = branch.code_state.run(&mut run_context, &self.protocol);
 

	
 
                match run_result {
 
                    RunResult::BranchInconsistent => {
 
                        // Speculative branch became inconsistent. So we don't
 
                        // run it again
 
                        branch.branch_state = BranchState::Failed;
 
                    },
 
                    RunResult::BranchMissingPortState(port_id) => {
 
                        // Branch called `fires()` on a port that did not have a
 
                        // value assigned yet. So branch and keep running
 
                        debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                        debug_assert!(branch.port_mapping.get(&port_id).is_none());
 

	
 
                        let copied_index = Self::duplicate_branch(desc, branch_index);
 

	
 
                        // Need to re-borrow to assign changed port state
 
                        let original_branch = &mut desc.branches[branch_index as usize];
 
                        original_branch.port_mapping.insert(port_id, BranchPortDesc{
 
                            last_registered_identifier: None,
 
                            num_times_fired: 0,
 
                        });
 

	
 
                        let copied_branch = &mut desc.branches[copied_index as usize];
 
                        copied_branch.port_mapping.insert(port_id, BranchPortDesc{
 
                            last_registered_identifier: None,
 
                            num_times_fired: 1,
 
                        });
 

	
 
                        // Run both again
 
                        desc.spec_branches_active.push_back(branch_index);
 
                        desc.spec_branches_active.push_back(copied_index);
 
                    },
 
                    RunResult::BranchMissingPortValue(port_id) => {
 
                        // Branch just performed a `get()` on a port that did
 
                        // not yet receive a value.
 

	
 
                        // First check if a port value is assigned to the
 
                        // current branch. If so, check if it is consistent.
 
                        debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                        match branch.port_mapping.entry(port_id) {
 
                            Entry::Vacant(entry) => {
 
                                // No entry yet, so force to firing
 
                                entry.insert(BranchPortDesc{
 
                                    last_registered_identifier: None,
 
                                    num_times_fired: 1,
 
                                });
 
                                branch.branch_state = BranchState::BranchPoint;
 
                                desc.spec_branches_pending_receive.insert(port_id, branch_index);
 
                            },
 
                            Entry::Occupied(entry) => {
 
                                // Have an entry, check if it is consistent
 
                                let entry = entry.get();
 
                                if entry.num_times_fired == 0 {
 
                                    // Inconsistent
 
                                    branch.branch_state = BranchState::Failed;
 
                                } else {
 
                                    // Perfectly fine, add to queue
 
                                    branch.branch_state = BranchState::BranchPoint;
 
                                    desc.spec_branches_pending_receive.insert(port_id, branch_index);
 
                                }
 
                            }
 
                        }
 
                    },
 
                    RunResult::BranchAtSyncEnd => {
 
                        branch.branch_state = BranchState::ReachedEndSync;
 
                        todo!("somehow propose solution");
 
                    },
 
                    RunResult::BranchPut(port_id, value_group) => {
 
                        debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                        debug_assert_eq!(value_group.values.len(), 1)
 
                    },
 
                    _ => unreachable!("got result '{:?}' from running component in sync mode", run_result),
 
                }
 
            } else {
 
                // Running in non-synchronous mode
 
                let branch = &mut desc.branches[0];
 
                let run_result = branch.code_state.run(&mut run_context, &self.protocol);
 

	
 
                match run_result {
 
                    RunResult::ComponentTerminated => return false,
 
                    RunResult::ComponentAtSyncStart => {
 
                        // Prepare for sync execution
 
                        Self::prepare_branch_for_sync(desc);
 
                        call_again = true;
 
                    },
 
                    RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                        // Generate a new connector with its own state
 
                        let new_component_id = self.generate_connector_id();
 
                        let new_component_state = ComponentState {
 
                            prompt: Prompt::new(&self.protocol.types, &self.protocol.heap, definition_id, monomorph_idx, arguments)
 
                        };
 

	
 
                        // Transfer the ownership of any ports to the new connector
 
                        let mut ports = Vec::with_capacity(arguments.values.len());
 
                        find_ports_in_value_group(&arguments, &mut ports);
 
                        for port_id in &ports {
 
                            let port = self.registry.ports.get_mut(&port_id.0.u32_suffix).unwrap();
 
                            debug_assert_eq!(port.owning_connector_id.unwrap(), run_context.connector_id);
 
                            port.owning_connector_id = Some(new_component_id)
 
                        }
 

	
 
                        // Finally push the new connector into the registry
 
                        let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect();
 
                        self.registry.connectors.insert(new_component_id, ConnectorDesc::new(new_component_id, new_component_state, ports));
 
                        self.connectors_active.push_back(new_component_id);
 
                    },
 
                    RunResult::NewChannel => {
 
                        // Prepare channel
 
                        debug_assert!(run_context.pending_channel.is_none());
 
                        let (put_id, get_id) = self.registry.add_channel(Some(run_context.connector_id));
 
                        run_context.pending_channel = Some((
 
                            port_value_from_id(Some(run_context.connector_id), put_id, true),
 
                            port_value_from_id(Some(run_context.connector_id), get_id, false)
 
                        ));
 

	
 
                        // Call again so it is retrieved from the context
 
                        call_again = true;
 
                    },
 
                    _ => unreachable!("got result '{:?}' from running component in non-sync mode", run_result),
 
                }
 
            }
 
        }
 

	
 
        return true;
 
    }
 

	
 
    fn generate_connector_id(&mut self) -> u32 {
 
        let id = self.registry.connector_counter;
 
        self.registry.connector_counter += 1;
 
        return id;
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Helpers for branch management
 
    // -------------------------------------------------------------------------
 

	
 
    /// Prepares a speculative branch for further execution from the connector's
 
    /// non-speculative base branch.
 
    fn prepare_branch_for_sync(desc: &mut ConnectorDesc) {
 
        // Ensure only one branch is active, the non-sync branch
 
        debug_assert!(!desc.in_sync);
 
        debug_assert_eq!(desc.branches.len(), 1);
 
        debug_assert!(desc.spec_branches_active.is_empty());
 
        let new_branch_index = 1;
 
        let new_branch_identifier = desc.branch_id_counter;
 
        desc.branch_id_counter += 1;
 

	
 
        // Push first speculative branch as active branch
 
        let new_branch = BranchDesc::new_sync_from(new_branch_index, new_branch_identifier, &desc.branches[0]);
 
        desc.branches.push(new_branch);
 
        desc.spec_branches_active.push_back(new_id);
 
        desc.in_sync = true;
 
    }
 

	
 
    /// Duplicates a particular (speculative) branch and returns its index.
 
    fn duplicate_branch(desc: &mut ConnectorDesc, original_branch_idx: u32) -> u32 {
 
        let original_branch = &desc.branches[original_branch_idx as usize];
 
        debug_assert!(desc.in_sync);
 

	
 
        let copied_index = desc.branches.len() as u32;
 
        let copied_id = desc.branch_id_counter;
 
        desc.branch_id_counter += 1;
 

	
 
        let copied_branch = BranchDesc::new_sync_from(copied_index, copied_id, original_branch);
 
        desc.branches.push(copied_branch);
 

	
 
        return copied_index;
 
    }
 
}
 

	
 
/// Context accessible by the code while being executed by the runtime. When the
 
/// code is being executed by the runtime it sometimes needs to interact with 
 
/// the runtime. This is achieved by the "code throwing an error code", after 
 
/// which the runtime modifies the appropriate variables and continues executing
 
/// the code again. 
 
struct Context<'a> {
 
    // Properties of currently running connector/branch
 
    connector_id: u32,
 
    branch_id: Option<u32>,
 
    // Resources ready to be retrieved by running code
 
    pending_channel: Option<(Value, Value)>, // (put, get) ports
 
}
 

	
 
impl<'a> crate::protocol::RunContext for Context<'a> {
 
    fn did_put(&self, port: PortId) -> bool {
 
        todo!()
 
    }
 

	
 
    fn get(&self, port: PortId) -> Option<Value> {
 
        todo!()
 
    }
 

	
 
    fn fires(&self, port: PortId) -> Option<Value> {
 
        todo!()
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        self.pending_channel.take()
 
    }
 
}
 

	
 
/// Recursively goes through the value group, attempting to find ports. 
 
/// Duplicates will only be added once.
 
fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortId>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortId>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                for prev_port in ports {
 
                    if prev_port == port_id {
 
                        // Already added
 
                        return;
 
                    }
 
                }
 
                
 
                ports.push(*port_id);
 
            },
 
            Value::Array(heap_pos) | 
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
 
                // Reference to some dynamic thing which might contain ports,
 
                // so recurse
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for embedded_value in heap_region {
 
                    find_port_in_value(group, embedded_value, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 
    
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for value in &value_group.values {
 
        find_port_in_value(value_group, value, ports);
 
    }
 
}
 

	
 
fn port_value_from_id(connector_id: Option<u32>, port_id: u32, is_output: bool) -> Value {
 
    let connector_id = connector_id.unwrap_or(u32::MAX); // TODO: @hack, review entire PortId/ConnectorId/Id system
 
    if is_output {
 
        return Value::Output(PortId(Id{
 
            connector_id,
 
            u32_suffix: port_id
 
        }));
 
    } else {
 
        return Value::Input(PortId(Id{
 
            connector_id,
 
            u32_suffix: port_id,
 
        }));
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)