diff --git a/src/common.rs b/src/common.rs index 2a6d8069b10b66f9aa9353131cae7d157c718c30..537487d427612edb0a78dec1f1462bfd040f1db5 100644 --- a/src/common.rs +++ b/src/common.rs @@ -52,7 +52,7 @@ pub struct ComponentId(Id); // PUB because it can be returned by errors /// Identifier of a port in a session #[derive(Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize)] #[repr(transparent)] -pub struct PortId(Id); +pub struct PortId(pub(crate) Id); /// A safely aliasable heap-allocated payload of message bytes #[derive(Default, Eq, PartialEq, Clone, Ord, PartialOrd)] diff --git a/src/protocol/eval/executor.rs b/src/protocol/eval/executor.rs index e7bb604b13a60c3d851918811cc9ae566d8686bb..f95a61b3c62f75ac315120a8b9dfa3baac09d2a3 100644 --- a/src/protocol/eval/executor.rs +++ b/src/protocol/eval/executor.rs @@ -199,9 +199,10 @@ pub enum EvalContinuation { SyncBlockStart, SyncBlockEnd, NewComponent(DefinitionId, i32, ValueGroup), - BlockFires(Value), - BlockGet(Value), - Put(Value, Value), + NewChannel, + BlockFires(PortId), + BlockGet(PortId), + Put(PortId, Value), } // Note: cloning is fine, methinks. cloning all values and the heap regions then @@ -230,7 +231,14 @@ impl Prompt { prompt } - pub(crate) fn step(&mut self, types: &TypeTable, heap: &Heap, modules: &[Module], ctx: &mut EvalContext) -> EvalResult { + /// Big 'ol function right here. Didn't want to break it up unnecessarily. + /// It consists of, in sequence: executing any expressions that should be + /// executed before the next statement can be evaluated, then a section that + /// performs debug printing, and finally a section that takes the next + /// statement and executes it. If the statement requires any expressions to + /// be evaluated, then they will be added such that the next time `step` is + /// called, all of these expressions are indeed evaluated. + pub(crate) fn step(&mut self, types: &TypeTable, heap: &Heap, modules: &[Module], ctx: &mut impl RunContext) -> EvalResult { // Helper function to transfer multiple values from the expression value // array into a heap region (e.g. constructing arrays or structs). fn transfer_expression_values_front_into_heap(cur_frame: &mut Frame, store: &mut Store, num_values: usize) -> HeapPos { @@ -570,20 +578,36 @@ impl Prompt { let value = cur_frame.expr_values.pop_front().unwrap(); let value = self.store.maybe_read_ref(&value).clone(); - match ctx.get(value.clone(), &mut self.store) { + let port_id = if let Value::Input(port_id) = value { + port_id + } else { + unreachable!("executor calling 'get' on value {:?}", value) + }; + + match ctx.get(port_id, &mut self.store) { Some(result) => { + // We have the result cur_frame.expr_values.push_back(result) }, None => { + // Don't have the result yet, prepare the expression to + // get run again after we've received a message. cur_frame.expr_values.push_front(value.clone()); cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id)); - return Ok(EvalContinuation::BlockGet(value)); + return Ok(EvalContinuation::BlockGet(port_id)); } } }, Method::Put => { let port_value = cur_frame.expr_values.pop_front().unwrap(); let deref_port_value = self.store.maybe_read_ref(&port_value).clone(); + + let port_id = if let Value::Output(port_id) = deref_port_value { + port_id + } else { + unreachable!("executor calling 'put' on value {:?}", deref_port_value) + }; + let msg_value = cur_frame.expr_values.pop_front().unwrap(); let deref_msg_value = self.store.maybe_read_ref(&msg_value).clone(); @@ -597,7 +621,7 @@ impl Prompt { } } - if ctx.did_put(deref_port_value.clone()) { + if ctx.did_put(port_id) { // We're fine, deallocate in case the expression value stack // held an owned value self.store.drop_value(msg_value.get_heap_pos()); @@ -605,17 +629,24 @@ impl Prompt { cur_frame.expr_values.push_front(msg_value); cur_frame.expr_values.push_front(port_value); cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id)); - return Ok(EvalContinuation::Put(deref_port_value, deref_msg_value)); + return Ok(EvalContinuation::Put(port_id, deref_msg_value)); } }, Method::Fires => { let port_value = cur_frame.expr_values.pop_front().unwrap(); let port_value_deref = self.store.maybe_read_ref(&port_value).clone(); - match ctx.fires(port_value_deref.clone()) { + + let port_id = match port_value_deref { + Value::Input(port_id) => port_id, + Value::Output(port_id) => port_id, + _ => unreachable!("executor calling 'fires' on value {:?}", value), + }; + + match ctx.fires(port_id) { None => { cur_frame.expr_values.push_front(port_value); cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id)); - return Ok(EvalContinuation::BlockFires(port_value_deref)); + return Ok(EvalContinuation::BlockFires(port_id)); }, Some(value) => { cur_frame.expr_values.push_back(value); @@ -765,17 +796,25 @@ impl Prompt { self.store.write(ValueId::Stack(variable.unique_id_in_scope as u32), Value::Unassigned); cur_frame.position = stmt.next; + Ok(EvalContinuation::Stepping) }, LocalStatement::Channel(stmt) => { - let [from_value, to_value] = ctx.new_channel(); - self.store.write(ValueId::Stack(heap[stmt.from].unique_id_in_scope as u32), from_value); - self.store.write(ValueId::Stack(heap[stmt.to].unique_id_in_scope as u32), to_value); - - cur_frame.position = stmt.next; + // Need to create a new channel by requesting it from + // the runtime. + match ctx.get_channel() { + None => { + // No channel is pending. So request one + Ok(EvalContinuation::NewChannel) + }, + Some((put_port, get_port)) => { + self.store.write(ValueId::Stack(heap[stmt.from].unique_id_in_scope as u32), put_port); + self.store.write(ValueId::Stack(heap[stmt.to].unique_id_in_scope as u32), get_port); + cur_frame.position = stmt.next; + Ok(EvalContinuation::Stepping) + } + } } } - - Ok(EvalContinuation::Stepping) }, Statement::Labeled(stmt) => { cur_frame.position = stmt.body; diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 81973bff7792a6fe583d7f2c8fce163993217622..ee0c5aed2fcc0282a9526d5b434bc963a111f434 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -26,14 +26,14 @@ pub struct Module { /// Description of a protocol object, used to configure new connectors. #[repr(C)] pub struct ProtocolDescription { - modules: Vec, - heap: Heap, - types: TypeTable, - pool: Mutex, + pub(crate) modules: Vec, + pub(crate) heap: Heap, + pub(crate) types: TypeTable, + pub(crate) pool: Mutex, } #[derive(Debug, Clone)] pub(crate) struct ComponentState { - prompt: Prompt, + pub(crate) prompt: Prompt, } #[allow(dead_code)] @@ -143,6 +143,7 @@ impl ProtocolDescription { let module_root = self.lookup_module_root(module_name).unwrap(); let root = &self.heap[module_root]; let def = root.get_definition_ident(&self.heap, identifier).unwrap(); + // TODO: Check for polymorph ComponentState { prompt: Prompt::new(&self.types, &self.heap, def, 0, ValueGroup::new_stack(args)) } } @@ -161,6 +162,67 @@ impl ProtocolDescription { return None; } } + +// TODO: @temp Should just become a concrete thing that is passed in +pub trait RunContext { + fn did_put(&self, port: PortId) -> bool; + fn get(&self, port: PortId) -> Option; // None if still waiting on message + fn fires(&self, port: PortId) -> Option; // None if not yet branched + fn get_channel(&self) -> Option<(Value, Value)>; // None if not yet prepared +} + +#[derive(Debug)] +pub enum RunResult { + // Can only occur outside sync blocks + ComponentTerminated, // component has exited its procedure + ComponentAtSyncStart, + NewComponent(DefinitionId, i32, ValueGroup), // should also be possible inside sync + NewChannel, // should also be possible inside sync + // Can only occur inside sync blocks + BranchInconsistent, // branch has inconsistent behaviour + BranchMissingPortState(PortId), // branch doesn't know about port firing + BranchMissingPortValue(PortId), // branch hasn't received message on input port yet + BranchAtSyncEnd, + BranchPut(PortId, ValueGroup), +} + +impl ComponentState { + pub(crate) fn run(&mut self, ctx: &mut impl RunContext, pd: &ProtocolDescription) -> RunResult { + use EvalContinuation as EC; + use RunResult as RR; + + loop { + let step_result = self.prompt.step(&pd.types, &pd.heap, &pd.modules, ctx); + match step_result { + Err(reason) => { + // TODO: @temp + println!("Evaluation error:\n{}", reason); + todo!("proper error handling/bubbling up"); + }, + Ok(continuation) => match continuation { + // TODO: Probably want to remove this translation + EC::Stepping => continue, + EC::Inconsistent => return RR::BranchInconsistent, + EC::Terminal => return RR::ComponentTerminated, + EC::SyncBlockStart => return RR::ComponentAtSyncStart, + EC::SyncBlockEnd => return RR::BranchAtSyncEnd, + EC::NewComponent(definition_id, monomorph_idx, args) => + return RR::NewComponent(definition_id, monomorph_idx, arg), + EC::NewChannel => + return RR::NewChannel, + EC::BlockFires(port_id) => return RR::BranchMissingPortState(port_id), + EC::BlockGet(port_id) => return RR::BranchMissingPortValue(port_id), + EC::Put(port_id, value) => { + let value_group = ValueGroup::from_store(&self.prompt.store, &[value]); + return RR::BranchPut(port_id, value_group); + }, + } + } + } + } +} + +// TODO: @remove the old stuff impl ComponentState { pub(crate) fn nonsync_run<'a: 'b, 'b>( &'a mut self, @@ -261,16 +323,6 @@ impl ComponentState { _ => unreachable!(), }, EvalContinuation::Put(port, message) => { - let value; - match port { - Value::Output(port_value) => { - value = port_value; - } - Value::Input(port_value) => { - value = port_value; - } - _ => unreachable!(), - } let payload; match message { Value::Null => { @@ -287,7 +339,7 @@ impl ComponentState { } _ => unreachable!(), } - return SyncBlocker::PutMsg(value, payload); + return SyncBlocker::PutMsg(port, payload); } }, } diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index b9389b46f08898ceef34885bc2365787f6e30b93..a6d9e0e02d16010e5298d0bef7979c9c14733448 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -1,2 +1 @@ -mod runtime; -mod registry; \ No newline at end of file +mod runtime; \ No newline at end of file diff --git a/src/runtime2/registry.rs b/src/runtime2/registry.rs deleted file mode 100644 index 892b220a213f1adf8f98e4a86ef30d74aed007ab..0000000000000000000000000000000000000000 --- a/src/runtime2/registry.rs +++ /dev/null @@ -1,14 +0,0 @@ -use std::collections::HashMap; - -use crate::PortId; - -struct PortDesc { - port_id: u32, - owning_connector_id: u32, -} - -pub(crate) struct Registry { - connector_counter: u32, - port_counter: u32, - ports: HashMap -} \ No newline at end of file diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 6575334d7389c223affb7aa403bca17d3f958ae0..8c73481041f32674f8505035dc4dfa306d7519ca 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -1,8 +1,9 @@ use std::sync::Arc; +use std::collections::{HashMap, VecDeque}; +use std::collections::hash_map::{Entry}; -use crate::runtime::error as old_error; - -use crate::Polarity; +use crate::{Polarity, PortId}; +use crate::common::Id; use crate::protocol::*; use crate::protocol::eval::*; @@ -14,25 +15,194 @@ enum AddComponentError { InvalidArgumentType(usize), // value is index of (first) invalid argument } +struct PortDesc { + id: u32, + peer_id: u32, + owning_connector_id: Option, + is_getter: bool, // otherwise one can only call `put` +} + +// Message received from some kind of peer +struct BufferedMessage { + // If in inbox, then sender is the connector's peer. If in the outbox, then + // the sender is the connector itself. + sending_port: PortId, + receiving_port: PortId, + peer_prev_branch_id: Option, // of the sender + peer_cur_branch_id: u32, // of the sender + message: ValueGroup, +} + +struct ConnectorDesc { + id: u32, + in_sync: bool, + branches: Vec, // first one is always non-speculative one + branch_id_counter: u32, + spec_branches_active: VecDeque, // branches that can be run immediately + spec_branches_pending_receive: HashMap, // from port_id to branch index + global_inbox: HashMap<(PortId, u32), BufferedMessage>, + global_outbox: HashMap<(PortId, u32), BufferedMessage>, +} + +impl ConnectorDesc { + /// Creates a new connector description. Implicit assumption is that there + /// is one (non-sync) branch that can be immediately executed. + fn new(id: u32, component_state: ComponentState, owned_ports: Vec) -> Self { + let mut branches_active = VecDeque::new(); + branches_active.push_back(0); + + Self{ + id, + in_sync: false, + branches: vec![BranchDesc::new_non_sync(component_state, owned_ports)], + branch_id_counter: 1, + spec_branches_active: branches_active, + spec_branches_pending_receive: HashMap::new(), + global_inbox: HashMap::new(), + global_outbox: HashMap::new(), + } + } +} + +enum BranchState { + RunningNonSync, // regular running non-speculative branch + RunningSync, // regular running speculative branch + BranchPoint, // branch which ended up being a branching point + ReachedEndSync, // branch that successfully reached the end-sync point, is a possible local solution + Failed, // branch that became inconsistent +} + +struct BranchPortDesc { + last_registered_identifier: Option, // if putter, then last sent branch ID, if getter, then last received branch ID + num_times_fired: u32, // number of puts/gets on this port +} + +struct BranchDesc { + index: u32, + parent_index: Option, + identifier: u32, + code_state: ComponentState, + branch_state: BranchState, + owned_ports: Vec, + message_inbox: HashMap<(PortId, u32), ValueGroup>, // from (port id, 1-based recv index) to received value + port_mapping: HashMap, +} + +impl BranchDesc { + /// Creates the first non-sync branch of a connector + fn new_non_sync(component_state: ComponentState, owned_ports: Vec) -> Self { + Self{ + index: 0, + parent_index: None, + identifier: 0, + code_state: component_state, + branch_state: BranchState::RunningNonSync, + owned_ports, + message_inbox: HashMap::new(), + port_mapping: HashMap::new(), + } + } + + /// Creates a sync branch based on the supplied branch. This supplied branch + /// is the branching point for the new one, i.e. the parent in the branching + /// tree. + fn new_sync_from(index: u32, identifier: u32, branch_state: &BranchDesc) -> Self { + Self{ + index, + parent_index: Some(branch_state.index), + identifier, + code_state: branch_state.code_state.clone(), + branch_state: BranchState::RunningSync, + owned_ports: branch_state.owned_ports.clone(), + message_inbox: branch_state.message_inbox.clone(), + port_mapping: branch_state.port_mapping.clone(), + } + } +} + +// Separate from Runtime for borrowing reasons +struct Registry { + ports: HashMap, + port_counter: u32, + connectors: HashMap, + connector_counter: u32, +} + +impl Registry { + fn new() -> Self { + Self{ + ports: HashMap::new(), + port_counter: 0, + connectors: HashMap::new(), + connector_counter: 0, + } + } + + /// Returns (putter_port, getter_port) + pub fn add_channel(&mut self, owning_connector_id: Option) -> (u32, u32) { + let get_id = self.generate_port_id(); + let put_id = self.generate_port_id(); + + self.ports.insert(get_id, PortDesc{ + id: get_id, + peer_id: put_id, + owning_connector_id, + is_getter: true, + }); + self.ports.insert(put_id, PortDesc{ + id: put_id, + peer_id: get_id, + owning_connector_id, + is_getter: false, + }); + + return (put_id, get_id); + } + + fn generate_port_id(&mut self) -> u32 { + let id = self.port_counter; + self.port_counter += 1; + return id; + } +} + +// TODO: @performance, use freelists+ids instead of HashMaps struct Runtime { protocol: Arc, - + registry: Registry, + connectors_active: VecDeque, } impl Runtime { pub fn new(pd: Arc) -> Self { - Self{ protocol: pd } + Self{ + protocol: pd, + registry: Registry::new(), + connectors_active: VecDeque::new(), + } + } + + /// Creates a new channel that is not owned by any connector and returns its + /// endpoints. The returned values are of the (putter port, getter port) + /// respectively. + pub fn add_channel(&mut self) -> (Value, Value) { + let (put_id, get_id) = self.registry.add_channel(None); + return ( + port_value_from_id(None, put_id, true), + port_value_from_id(None, get_id, false) + ); } pub fn add_component(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), AddComponentError> { use AddComponentError as ACE; - use old_error::AddComponentError as OldACE; + use crate::runtime::error::AddComponentError as OldACE; // TODO: Allow the ValueGroup to contain any kind of value // TODO: Remove the responsibility of adding a component from the PD // Lookup module and the component - // TODO: Remove this error enum translation + // TODO: Remove this error enum translation. Note that for now this + // function forces port-only arguments let port_polarities = match self.protocol.component_polarities(module.as_bytes(), procedure.as_bytes()) { Ok(polarities) => polarities, Err(reason) => match reason { @@ -45,6 +215,7 @@ impl Runtime { // Make sure supplied values (and types) are correct let mut ports = Vec::with_capacity(values.values.len()); + for (value_idx, value) in values.values.iter().enumerate() { let polarity = &port_polarities[value_idx]; @@ -68,6 +239,301 @@ impl Runtime { } // Instantiate the component + let component_id = self.generate_connector_id(); let component_state = self.protocol.new_component(module.as_bytes(), procedure.as_bytes(), &ports); + let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect(); + + self.registry.connectors.insert(component_id, ConnectorDesc::new(component_id, component_state, ports)); + self.connectors_active.push_back(component_id); + + Ok(()) + } + + pub fn run(&mut self) { + // Go through all active connectors + while !self.connectors_active.is_empty() { + let next_id = self.connectors_active.pop_front().unwrap(); + self.run_connector(next_id); + } + } + + /// Runs a connector for as long as sensible, then returns `true` if the + /// connector should be run again in the future, and return `false` if the + /// connector has terminated. Note that a terminated connector still + /// requires cleanup. + pub fn run_connector(&mut self, id: u32) -> bool { + let desc = self.registry.connectors.get_mut(&id).unwrap(); + let mut run_context = Context{ + connector_id: id, + branch_id: None, + pending_channel: None, + }; + + let mut call_again = false; + + while call_again { + call_again = false; // bit of a silly pattern, maybe revise + + if desc.in_sync { + // Running in synchronous mode, so run all branches until their + // blocking point + debug_assert!(!desc.spec_branches_active.is_empty()); + let branch_index = desc.spec_branches_active.pop_front().unwrap(); + + let branch = &mut desc.branches[branch_index as usize]; + let run_result = branch.code_state.run(&mut run_context, &self.protocol); + + match run_result { + RunResult::BranchInconsistent => { + // Speculative branch became inconsistent. So we don't + // run it again + branch.branch_state = BranchState::Failed; + }, + RunResult::BranchMissingPortState(port_id) => { + // Branch called `fires()` on a port that did not have a + // value assigned yet. So branch and keep running + debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); + debug_assert!(branch.port_mapping.get(&port_id).is_none()); + + let copied_index = Self::duplicate_branch(desc, branch_index); + + // Need to re-borrow to assign changed port state + let original_branch = &mut desc.branches[branch_index as usize]; + original_branch.port_mapping.insert(port_id, BranchPortDesc{ + last_registered_identifier: None, + num_times_fired: 0, + }); + + let copied_branch = &mut desc.branches[copied_index as usize]; + copied_branch.port_mapping.insert(port_id, BranchPortDesc{ + last_registered_identifier: None, + num_times_fired: 1, + }); + + // Run both again + desc.spec_branches_active.push_back(branch_index); + desc.spec_branches_active.push_back(copied_index); + }, + RunResult::BranchMissingPortValue(port_id) => { + // Branch just performed a `get()` on a port that did + // not yet receive a value. + + // First check if a port value is assigned to the + // current branch. If so, check if it is consistent. + debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); + match branch.port_mapping.entry(port_id) { + Entry::Vacant(entry) => { + // No entry yet, so force to firing + entry.insert(BranchPortDesc{ + last_registered_identifier: None, + num_times_fired: 1, + }); + branch.branch_state = BranchState::BranchPoint; + desc.spec_branches_pending_receive.insert(port_id, branch_index); + }, + Entry::Occupied(entry) => { + // Have an entry, check if it is consistent + let entry = entry.get(); + if entry.num_times_fired == 0 { + // Inconsistent + branch.branch_state = BranchState::Failed; + } else { + // Perfectly fine, add to queue + branch.branch_state = BranchState::BranchPoint; + desc.spec_branches_pending_receive.insert(port_id, branch_index); + } + } + } + }, + RunResult::BranchAtSyncEnd => { + branch.branch_state = BranchState::ReachedEndSync; + todo!("somehow propose solution"); + }, + RunResult::BranchPut(port_id, value_group) => { + debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); + debug_assert_eq!(value_group.values.len(), 1) + }, + _ => unreachable!("got result '{:?}' from running component in sync mode", run_result), + } + } else { + // Running in non-synchronous mode + let branch = &mut desc.branches[0]; + let run_result = branch.code_state.run(&mut run_context, &self.protocol); + + match run_result { + RunResult::ComponentTerminated => return false, + RunResult::ComponentAtSyncStart => { + // Prepare for sync execution + Self::prepare_branch_for_sync(desc); + call_again = true; + }, + RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { + // Generate a new connector with its own state + let new_component_id = self.generate_connector_id(); + let new_component_state = ComponentState { + prompt: Prompt::new(&self.protocol.types, &self.protocol.heap, definition_id, monomorph_idx, arguments) + }; + + // Transfer the ownership of any ports to the new connector + let mut ports = Vec::with_capacity(arguments.values.len()); + find_ports_in_value_group(&arguments, &mut ports); + for port_id in &ports { + let port = self.registry.ports.get_mut(&port_id.0.u32_suffix).unwrap(); + debug_assert_eq!(port.owning_connector_id.unwrap(), run_context.connector_id); + port.owning_connector_id = Some(new_component_id) + } + + // Finally push the new connector into the registry + let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect(); + self.registry.connectors.insert(new_component_id, ConnectorDesc::new(new_component_id, new_component_state, ports)); + self.connectors_active.push_back(new_component_id); + }, + RunResult::NewChannel => { + // Prepare channel + debug_assert!(run_context.pending_channel.is_none()); + let (put_id, get_id) = self.registry.add_channel(Some(run_context.connector_id)); + run_context.pending_channel = Some(( + port_value_from_id(Some(run_context.connector_id), put_id, true), + port_value_from_id(Some(run_context.connector_id), get_id, false) + )); + + // Call again so it is retrieved from the context + call_again = true; + }, + _ => unreachable!("got result '{:?}' from running component in non-sync mode", run_result), + } + } + } + + return true; + } + + fn generate_connector_id(&mut self) -> u32 { + let id = self.registry.connector_counter; + self.registry.connector_counter += 1; + return id; + } + + // ------------------------------------------------------------------------- + // Helpers for branch management + // ------------------------------------------------------------------------- + + /// Prepares a speculative branch for further execution from the connector's + /// non-speculative base branch. + fn prepare_branch_for_sync(desc: &mut ConnectorDesc) { + // Ensure only one branch is active, the non-sync branch + debug_assert!(!desc.in_sync); + debug_assert_eq!(desc.branches.len(), 1); + debug_assert!(desc.spec_branches_active.is_empty()); + let new_branch_index = 1; + let new_branch_identifier = desc.branch_id_counter; + desc.branch_id_counter += 1; + + // Push first speculative branch as active branch + let new_branch = BranchDesc::new_sync_from(new_branch_index, new_branch_identifier, &desc.branches[0]); + desc.branches.push(new_branch); + desc.spec_branches_active.push_back(new_id); + desc.in_sync = true; + } + + /// Duplicates a particular (speculative) branch and returns its index. + fn duplicate_branch(desc: &mut ConnectorDesc, original_branch_idx: u32) -> u32 { + let original_branch = &desc.branches[original_branch_idx as usize]; + debug_assert!(desc.in_sync); + + let copied_index = desc.branches.len() as u32; + let copied_id = desc.branch_id_counter; + desc.branch_id_counter += 1; + + let copied_branch = BranchDesc::new_sync_from(copied_index, copied_id, original_branch); + desc.branches.push(copied_branch); + + return copied_index; + } +} + +/// Context accessible by the code while being executed by the runtime. When the +/// code is being executed by the runtime it sometimes needs to interact with +/// the runtime. This is achieved by the "code throwing an error code", after +/// which the runtime modifies the appropriate variables and continues executing +/// the code again. +struct Context<'a> { + // Properties of currently running connector/branch + connector_id: u32, + branch_id: Option, + // Resources ready to be retrieved by running code + pending_channel: Option<(Value, Value)>, // (put, get) ports +} + +impl<'a> crate::protocol::RunContext for Context<'a> { + fn did_put(&self, port: PortId) -> bool { + todo!() + } + + fn get(&self, port: PortId) -> Option { + todo!() + } + + fn fires(&self, port: PortId) -> Option { + todo!() + } + + fn get_channel(&mut self) -> Option<(Value, Value)> { + self.pending_channel.take() + } +} + +/// Recursively goes through the value group, attempting to find ports. +/// Duplicates will only be added once. +fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { + // Helper to check a value for a port and recurse if needed. + fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { + match value { + Value::Input(port_id) | Value::Output(port_id) => { + // This is an actual port + for prev_port in ports { + if prev_port == port_id { + // Already added + return; + } + } + + ports.push(*port_id); + }, + Value::Array(heap_pos) | + Value::Message(heap_pos) | + Value::String(heap_pos) | + Value::Struct(heap_pos) | + Value::Union(_, heap_pos) => { + // Reference to some dynamic thing which might contain ports, + // so recurse + let heap_region = &group.regions[*heap_pos as usize]; + for embedded_value in heap_region { + find_port_in_value(group, embedded_value, ports); + } + }, + _ => {}, // values we don't care about + } + } + + // Clear the ports, then scan all the available values + ports.clear(); + for value in &value_group.values { + find_port_in_value(value_group, value, ports); + } +} + +fn port_value_from_id(connector_id: Option, port_id: u32, is_output: bool) -> Value { + let connector_id = connector_id.unwrap_or(u32::MAX); // TODO: @hack, review entire PortId/ConnectorId/Id system + if is_output { + return Value::Output(PortId(Id{ + connector_id, + u32_suffix: port_id + })); + } else { + return Value::Input(PortId(Id{ + connector_id, + u32_suffix: port_id, + })); } } \ No newline at end of file