diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 6575334d7389c223affb7aa403bca17d3f958ae0..8c73481041f32674f8505035dc4dfa306d7519ca 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -1,8 +1,9 @@ use std::sync::Arc; +use std::collections::{HashMap, VecDeque}; +use std::collections::hash_map::{Entry}; -use crate::runtime::error as old_error; - -use crate::Polarity; +use crate::{Polarity, PortId}; +use crate::common::Id; use crate::protocol::*; use crate::protocol::eval::*; @@ -14,25 +15,194 @@ enum AddComponentError { InvalidArgumentType(usize), // value is index of (first) invalid argument } +struct PortDesc { + id: u32, + peer_id: u32, + owning_connector_id: Option, + is_getter: bool, // otherwise one can only call `put` +} + +// Message received from some kind of peer +struct BufferedMessage { + // If in inbox, then sender is the connector's peer. If in the outbox, then + // the sender is the connector itself. + sending_port: PortId, + receiving_port: PortId, + peer_prev_branch_id: Option, // of the sender + peer_cur_branch_id: u32, // of the sender + message: ValueGroup, +} + +struct ConnectorDesc { + id: u32, + in_sync: bool, + branches: Vec, // first one is always non-speculative one + branch_id_counter: u32, + spec_branches_active: VecDeque, // branches that can be run immediately + spec_branches_pending_receive: HashMap, // from port_id to branch index + global_inbox: HashMap<(PortId, u32), BufferedMessage>, + global_outbox: HashMap<(PortId, u32), BufferedMessage>, +} + +impl ConnectorDesc { + /// Creates a new connector description. Implicit assumption is that there + /// is one (non-sync) branch that can be immediately executed. + fn new(id: u32, component_state: ComponentState, owned_ports: Vec) -> Self { + let mut branches_active = VecDeque::new(); + branches_active.push_back(0); + + Self{ + id, + in_sync: false, + branches: vec![BranchDesc::new_non_sync(component_state, owned_ports)], + branch_id_counter: 1, + spec_branches_active: branches_active, + spec_branches_pending_receive: HashMap::new(), + global_inbox: HashMap::new(), + global_outbox: HashMap::new(), + } + } +} + +enum BranchState { + RunningNonSync, // regular running non-speculative branch + RunningSync, // regular running speculative branch + BranchPoint, // branch which ended up being a branching point + ReachedEndSync, // branch that successfully reached the end-sync point, is a possible local solution + Failed, // branch that became inconsistent +} + +struct BranchPortDesc { + last_registered_identifier: Option, // if putter, then last sent branch ID, if getter, then last received branch ID + num_times_fired: u32, // number of puts/gets on this port +} + +struct BranchDesc { + index: u32, + parent_index: Option, + identifier: u32, + code_state: ComponentState, + branch_state: BranchState, + owned_ports: Vec, + message_inbox: HashMap<(PortId, u32), ValueGroup>, // from (port id, 1-based recv index) to received value + port_mapping: HashMap, +} + +impl BranchDesc { + /// Creates the first non-sync branch of a connector + fn new_non_sync(component_state: ComponentState, owned_ports: Vec) -> Self { + Self{ + index: 0, + parent_index: None, + identifier: 0, + code_state: component_state, + branch_state: BranchState::RunningNonSync, + owned_ports, + message_inbox: HashMap::new(), + port_mapping: HashMap::new(), + } + } + + /// Creates a sync branch based on the supplied branch. This supplied branch + /// is the branching point for the new one, i.e. the parent in the branching + /// tree. + fn new_sync_from(index: u32, identifier: u32, branch_state: &BranchDesc) -> Self { + Self{ + index, + parent_index: Some(branch_state.index), + identifier, + code_state: branch_state.code_state.clone(), + branch_state: BranchState::RunningSync, + owned_ports: branch_state.owned_ports.clone(), + message_inbox: branch_state.message_inbox.clone(), + port_mapping: branch_state.port_mapping.clone(), + } + } +} + +// Separate from Runtime for borrowing reasons +struct Registry { + ports: HashMap, + port_counter: u32, + connectors: HashMap, + connector_counter: u32, +} + +impl Registry { + fn new() -> Self { + Self{ + ports: HashMap::new(), + port_counter: 0, + connectors: HashMap::new(), + connector_counter: 0, + } + } + + /// Returns (putter_port, getter_port) + pub fn add_channel(&mut self, owning_connector_id: Option) -> (u32, u32) { + let get_id = self.generate_port_id(); + let put_id = self.generate_port_id(); + + self.ports.insert(get_id, PortDesc{ + id: get_id, + peer_id: put_id, + owning_connector_id, + is_getter: true, + }); + self.ports.insert(put_id, PortDesc{ + id: put_id, + peer_id: get_id, + owning_connector_id, + is_getter: false, + }); + + return (put_id, get_id); + } + + fn generate_port_id(&mut self) -> u32 { + let id = self.port_counter; + self.port_counter += 1; + return id; + } +} + +// TODO: @performance, use freelists+ids instead of HashMaps struct Runtime { protocol: Arc, - + registry: Registry, + connectors_active: VecDeque, } impl Runtime { pub fn new(pd: Arc) -> Self { - Self{ protocol: pd } + Self{ + protocol: pd, + registry: Registry::new(), + connectors_active: VecDeque::new(), + } + } + + /// Creates a new channel that is not owned by any connector and returns its + /// endpoints. The returned values are of the (putter port, getter port) + /// respectively. + pub fn add_channel(&mut self) -> (Value, Value) { + let (put_id, get_id) = self.registry.add_channel(None); + return ( + port_value_from_id(None, put_id, true), + port_value_from_id(None, get_id, false) + ); } pub fn add_component(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), AddComponentError> { use AddComponentError as ACE; - use old_error::AddComponentError as OldACE; + use crate::runtime::error::AddComponentError as OldACE; // TODO: Allow the ValueGroup to contain any kind of value // TODO: Remove the responsibility of adding a component from the PD // Lookup module and the component - // TODO: Remove this error enum translation + // TODO: Remove this error enum translation. Note that for now this + // function forces port-only arguments let port_polarities = match self.protocol.component_polarities(module.as_bytes(), procedure.as_bytes()) { Ok(polarities) => polarities, Err(reason) => match reason { @@ -45,6 +215,7 @@ impl Runtime { // Make sure supplied values (and types) are correct let mut ports = Vec::with_capacity(values.values.len()); + for (value_idx, value) in values.values.iter().enumerate() { let polarity = &port_polarities[value_idx]; @@ -68,6 +239,301 @@ impl Runtime { } // Instantiate the component + let component_id = self.generate_connector_id(); let component_state = self.protocol.new_component(module.as_bytes(), procedure.as_bytes(), &ports); + let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect(); + + self.registry.connectors.insert(component_id, ConnectorDesc::new(component_id, component_state, ports)); + self.connectors_active.push_back(component_id); + + Ok(()) + } + + pub fn run(&mut self) { + // Go through all active connectors + while !self.connectors_active.is_empty() { + let next_id = self.connectors_active.pop_front().unwrap(); + self.run_connector(next_id); + } + } + + /// Runs a connector for as long as sensible, then returns `true` if the + /// connector should be run again in the future, and return `false` if the + /// connector has terminated. Note that a terminated connector still + /// requires cleanup. + pub fn run_connector(&mut self, id: u32) -> bool { + let desc = self.registry.connectors.get_mut(&id).unwrap(); + let mut run_context = Context{ + connector_id: id, + branch_id: None, + pending_channel: None, + }; + + let mut call_again = false; + + while call_again { + call_again = false; // bit of a silly pattern, maybe revise + + if desc.in_sync { + // Running in synchronous mode, so run all branches until their + // blocking point + debug_assert!(!desc.spec_branches_active.is_empty()); + let branch_index = desc.spec_branches_active.pop_front().unwrap(); + + let branch = &mut desc.branches[branch_index as usize]; + let run_result = branch.code_state.run(&mut run_context, &self.protocol); + + match run_result { + RunResult::BranchInconsistent => { + // Speculative branch became inconsistent. So we don't + // run it again + branch.branch_state = BranchState::Failed; + }, + RunResult::BranchMissingPortState(port_id) => { + // Branch called `fires()` on a port that did not have a + // value assigned yet. So branch and keep running + debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); + debug_assert!(branch.port_mapping.get(&port_id).is_none()); + + let copied_index = Self::duplicate_branch(desc, branch_index); + + // Need to re-borrow to assign changed port state + let original_branch = &mut desc.branches[branch_index as usize]; + original_branch.port_mapping.insert(port_id, BranchPortDesc{ + last_registered_identifier: None, + num_times_fired: 0, + }); + + let copied_branch = &mut desc.branches[copied_index as usize]; + copied_branch.port_mapping.insert(port_id, BranchPortDesc{ + last_registered_identifier: None, + num_times_fired: 1, + }); + + // Run both again + desc.spec_branches_active.push_back(branch_index); + desc.spec_branches_active.push_back(copied_index); + }, + RunResult::BranchMissingPortValue(port_id) => { + // Branch just performed a `get()` on a port that did + // not yet receive a value. + + // First check if a port value is assigned to the + // current branch. If so, check if it is consistent. + debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); + match branch.port_mapping.entry(port_id) { + Entry::Vacant(entry) => { + // No entry yet, so force to firing + entry.insert(BranchPortDesc{ + last_registered_identifier: None, + num_times_fired: 1, + }); + branch.branch_state = BranchState::BranchPoint; + desc.spec_branches_pending_receive.insert(port_id, branch_index); + }, + Entry::Occupied(entry) => { + // Have an entry, check if it is consistent + let entry = entry.get(); + if entry.num_times_fired == 0 { + // Inconsistent + branch.branch_state = BranchState::Failed; + } else { + // Perfectly fine, add to queue + branch.branch_state = BranchState::BranchPoint; + desc.spec_branches_pending_receive.insert(port_id, branch_index); + } + } + } + }, + RunResult::BranchAtSyncEnd => { + branch.branch_state = BranchState::ReachedEndSync; + todo!("somehow propose solution"); + }, + RunResult::BranchPut(port_id, value_group) => { + debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); + debug_assert_eq!(value_group.values.len(), 1) + }, + _ => unreachable!("got result '{:?}' from running component in sync mode", run_result), + } + } else { + // Running in non-synchronous mode + let branch = &mut desc.branches[0]; + let run_result = branch.code_state.run(&mut run_context, &self.protocol); + + match run_result { + RunResult::ComponentTerminated => return false, + RunResult::ComponentAtSyncStart => { + // Prepare for sync execution + Self::prepare_branch_for_sync(desc); + call_again = true; + }, + RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { + // Generate a new connector with its own state + let new_component_id = self.generate_connector_id(); + let new_component_state = ComponentState { + prompt: Prompt::new(&self.protocol.types, &self.protocol.heap, definition_id, monomorph_idx, arguments) + }; + + // Transfer the ownership of any ports to the new connector + let mut ports = Vec::with_capacity(arguments.values.len()); + find_ports_in_value_group(&arguments, &mut ports); + for port_id in &ports { + let port = self.registry.ports.get_mut(&port_id.0.u32_suffix).unwrap(); + debug_assert_eq!(port.owning_connector_id.unwrap(), run_context.connector_id); + port.owning_connector_id = Some(new_component_id) + } + + // Finally push the new connector into the registry + let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect(); + self.registry.connectors.insert(new_component_id, ConnectorDesc::new(new_component_id, new_component_state, ports)); + self.connectors_active.push_back(new_component_id); + }, + RunResult::NewChannel => { + // Prepare channel + debug_assert!(run_context.pending_channel.is_none()); + let (put_id, get_id) = self.registry.add_channel(Some(run_context.connector_id)); + run_context.pending_channel = Some(( + port_value_from_id(Some(run_context.connector_id), put_id, true), + port_value_from_id(Some(run_context.connector_id), get_id, false) + )); + + // Call again so it is retrieved from the context + call_again = true; + }, + _ => unreachable!("got result '{:?}' from running component in non-sync mode", run_result), + } + } + } + + return true; + } + + fn generate_connector_id(&mut self) -> u32 { + let id = self.registry.connector_counter; + self.registry.connector_counter += 1; + return id; + } + + // ------------------------------------------------------------------------- + // Helpers for branch management + // ------------------------------------------------------------------------- + + /// Prepares a speculative branch for further execution from the connector's + /// non-speculative base branch. + fn prepare_branch_for_sync(desc: &mut ConnectorDesc) { + // Ensure only one branch is active, the non-sync branch + debug_assert!(!desc.in_sync); + debug_assert_eq!(desc.branches.len(), 1); + debug_assert!(desc.spec_branches_active.is_empty()); + let new_branch_index = 1; + let new_branch_identifier = desc.branch_id_counter; + desc.branch_id_counter += 1; + + // Push first speculative branch as active branch + let new_branch = BranchDesc::new_sync_from(new_branch_index, new_branch_identifier, &desc.branches[0]); + desc.branches.push(new_branch); + desc.spec_branches_active.push_back(new_id); + desc.in_sync = true; + } + + /// Duplicates a particular (speculative) branch and returns its index. + fn duplicate_branch(desc: &mut ConnectorDesc, original_branch_idx: u32) -> u32 { + let original_branch = &desc.branches[original_branch_idx as usize]; + debug_assert!(desc.in_sync); + + let copied_index = desc.branches.len() as u32; + let copied_id = desc.branch_id_counter; + desc.branch_id_counter += 1; + + let copied_branch = BranchDesc::new_sync_from(copied_index, copied_id, original_branch); + desc.branches.push(copied_branch); + + return copied_index; + } +} + +/// Context accessible by the code while being executed by the runtime. When the +/// code is being executed by the runtime it sometimes needs to interact with +/// the runtime. This is achieved by the "code throwing an error code", after +/// which the runtime modifies the appropriate variables and continues executing +/// the code again. +struct Context<'a> { + // Properties of currently running connector/branch + connector_id: u32, + branch_id: Option, + // Resources ready to be retrieved by running code + pending_channel: Option<(Value, Value)>, // (put, get) ports +} + +impl<'a> crate::protocol::RunContext for Context<'a> { + fn did_put(&self, port: PortId) -> bool { + todo!() + } + + fn get(&self, port: PortId) -> Option { + todo!() + } + + fn fires(&self, port: PortId) -> Option { + todo!() + } + + fn get_channel(&mut self) -> Option<(Value, Value)> { + self.pending_channel.take() + } +} + +/// Recursively goes through the value group, attempting to find ports. +/// Duplicates will only be added once. +fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { + // Helper to check a value for a port and recurse if needed. + fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { + match value { + Value::Input(port_id) | Value::Output(port_id) => { + // This is an actual port + for prev_port in ports { + if prev_port == port_id { + // Already added + return; + } + } + + ports.push(*port_id); + }, + Value::Array(heap_pos) | + Value::Message(heap_pos) | + Value::String(heap_pos) | + Value::Struct(heap_pos) | + Value::Union(_, heap_pos) => { + // Reference to some dynamic thing which might contain ports, + // so recurse + let heap_region = &group.regions[*heap_pos as usize]; + for embedded_value in heap_region { + find_port_in_value(group, embedded_value, ports); + } + }, + _ => {}, // values we don't care about + } + } + + // Clear the ports, then scan all the available values + ports.clear(); + for value in &value_group.values { + find_port_in_value(value_group, value, ports); + } +} + +fn port_value_from_id(connector_id: Option, port_id: u32, is_output: bool) -> Value { + let connector_id = connector_id.unwrap_or(u32::MAX); // TODO: @hack, review entire PortId/ConnectorId/Id system + if is_output { + return Value::Output(PortId(Id{ + connector_id, + u32_suffix: port_id + })); + } else { + return Value::Input(PortId(Id{ + connector_id, + u32_suffix: port_id, + })); } } \ No newline at end of file