use std::sync::Arc; use std::collections::{HashMap, VecDeque}; use std::collections::hash_map::{Entry}; use crate::{Polarity, PortId}; use crate::common::Id; use crate::protocol::*; use crate::protocol::eval::*; use super::registry::Registry; enum AddComponentError { ModuleDoesNotExist, ConnectorDoesNotExist, InvalidArgumentType(usize), // value is index of (first) invalid argument } struct PortDesc { id: u32, peer_id: u32, owning_connector_id: Option, is_getter: bool, // otherwise one can only call `put` } // Message received from some kind of peer struct BufferedMessage { // If in inbox, then sender is the connector's peer. If in the outbox, then // the sender is the connector itself. sending_port: PortId, receiving_port: PortId, peer_prev_branch_id: Option, // of the sender peer_cur_branch_id: u32, // of the sender message: ValueGroup, } struct ConnectorDesc { id: u32, in_sync: bool, branches: Vec, // first one is always non-speculative one branch_id_counter: u32, spec_branches_active: VecDeque, // branches that can be run immediately spec_branches_pending_receive: HashMap, // from port_id to branch index global_inbox: HashMap<(PortId, u32), BufferedMessage>, global_outbox: HashMap<(PortId, u32), BufferedMessage>, } impl ConnectorDesc { /// Creates a new connector description. Implicit assumption is that there /// is one (non-sync) branch that can be immediately executed. fn new(id: u32, component_state: ComponentState, owned_ports: Vec) -> Self { let mut branches_active = VecDeque::new(); branches_active.push_back(0); Self{ id, in_sync: false, branches: vec![BranchDesc::new_non_sync(component_state, owned_ports)], branch_id_counter: 1, spec_branches_active: branches_active, spec_branches_pending_receive: HashMap::new(), global_inbox: HashMap::new(), global_outbox: HashMap::new(), } } } enum BranchState { RunningNonSync, // regular running non-speculative branch RunningSync, // regular running speculative branch BranchPoint, // branch which ended up being a branching point ReachedEndSync, // branch that successfully reached the end-sync point, is a possible local solution Failed, // branch that became inconsistent } struct BranchPortDesc { last_registered_identifier: Option, // if putter, then last sent branch ID, if getter, then last received branch ID num_times_fired: u32, // number of puts/gets on this port } struct BranchDesc { index: u32, parent_index: Option, identifier: u32, code_state: ComponentState, branch_state: BranchState, owned_ports: Vec, message_inbox: HashMap<(PortId, u32), ValueGroup>, // from (port id, 1-based recv index) to received value port_mapping: HashMap, } impl BranchDesc { /// Creates the first non-sync branch of a connector fn new_non_sync(component_state: ComponentState, owned_ports: Vec) -> Self { Self{ index: 0, parent_index: None, identifier: 0, code_state: component_state, branch_state: BranchState::RunningNonSync, owned_ports, message_inbox: HashMap::new(), port_mapping: HashMap::new(), } } /// Creates a sync branch based on the supplied branch. This supplied branch /// is the branching point for the new one, i.e. the parent in the branching /// tree. fn new_sync_from(index: u32, identifier: u32, branch_state: &BranchDesc) -> Self { Self{ index, parent_index: Some(branch_state.index), identifier, code_state: branch_state.code_state.clone(), branch_state: BranchState::RunningSync, owned_ports: branch_state.owned_ports.clone(), message_inbox: branch_state.message_inbox.clone(), port_mapping: branch_state.port_mapping.clone(), } } } // Separate from Runtime for borrowing reasons struct Registry { ports: HashMap, port_counter: u32, connectors: HashMap, connector_counter: u32, } impl Registry { fn new() -> Self { Self{ ports: HashMap::new(), port_counter: 0, connectors: HashMap::new(), connector_counter: 0, } } /// Returns (putter_port, getter_port) pub fn add_channel(&mut self, owning_connector_id: Option) -> (u32, u32) { let get_id = self.generate_port_id(); let put_id = self.generate_port_id(); self.ports.insert(get_id, PortDesc{ id: get_id, peer_id: put_id, owning_connector_id, is_getter: true, }); self.ports.insert(put_id, PortDesc{ id: put_id, peer_id: get_id, owning_connector_id, is_getter: false, }); return (put_id, get_id); } fn generate_port_id(&mut self) -> u32 { let id = self.port_counter; self.port_counter += 1; return id; } } // TODO: @performance, use freelists+ids instead of HashMaps struct Runtime { protocol: Arc, registry: Registry, connectors_active: VecDeque, } impl Runtime { pub fn new(pd: Arc) -> Self { Self{ protocol: pd, registry: Registry::new(), connectors_active: VecDeque::new(), } } /// Creates a new channel that is not owned by any connector and returns its /// endpoints. The returned values are of the (putter port, getter port) /// respectively. pub fn add_channel(&mut self) -> (Value, Value) { let (put_id, get_id) = self.registry.add_channel(None); return ( port_value_from_id(None, put_id, true), port_value_from_id(None, get_id, false) ); } pub fn add_component(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), AddComponentError> { use AddComponentError as ACE; use crate::runtime::error::AddComponentError as OldACE; // TODO: Allow the ValueGroup to contain any kind of value // TODO: Remove the responsibility of adding a component from the PD // Lookup module and the component // TODO: Remove this error enum translation. Note that for now this // function forces port-only arguments let port_polarities = match self.protocol.component_polarities(module.as_bytes(), procedure.as_bytes()) { Ok(polarities) => polarities, Err(reason) => match reason { OldACE::NonPortTypeParameters => return Err(ACE::InvalidArgumentType(0)), OldACE::NoSuchModule => return Err(ACE::ModuleDoesNotExist), OldACE::NoSuchComponent => return Err(ACE::ModuleDoesNotExist), _ => unreachable!(), } }; // Make sure supplied values (and types) are correct let mut ports = Vec::with_capacity(values.values.len()); for (value_idx, value) in values.values.iter().enumerate() { let polarity = &port_polarities[value_idx]; match value { Value::Input(port_id) => { if *polarity != Polarity::Getter { return Err(ACE::InvalidArgumentType(value_idx)) } ports.push(*port_id); }, Value::Output(port_id) => { if *polarity != Polarity::Putter { return Err(ACE::InvalidArgumentType(value_idx)) } ports.push(*port_id); }, _ => return Err(ACE::InvalidArgumentType(value_idx)) } } // Instantiate the component let component_id = self.generate_connector_id(); let component_state = self.protocol.new_component(module.as_bytes(), procedure.as_bytes(), &ports); let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect(); self.registry.connectors.insert(component_id, ConnectorDesc::new(component_id, component_state, ports)); self.connectors_active.push_back(component_id); Ok(()) } pub fn run(&mut self) { // Go through all active connectors while !self.connectors_active.is_empty() { let next_id = self.connectors_active.pop_front().unwrap(); self.run_connector(next_id); } } /// Runs a connector for as long as sensible, then returns `true` if the /// connector should be run again in the future, and return `false` if the /// connector has terminated. Note that a terminated connector still /// requires cleanup. pub fn run_connector(&mut self, id: u32) -> bool { let desc = self.registry.connectors.get_mut(&id).unwrap(); let mut run_context = Context{ connector_id: id, branch_id: None, pending_channel: None, }; let mut call_again = false; while call_again { call_again = false; // bit of a silly pattern, maybe revise if desc.in_sync { // Running in synchronous mode, so run all branches until their // blocking point debug_assert!(!desc.spec_branches_active.is_empty()); let branch_index = desc.spec_branches_active.pop_front().unwrap(); let branch = &mut desc.branches[branch_index as usize]; let run_result = branch.code_state.run(&mut run_context, &self.protocol); match run_result { RunResult::BranchInconsistent => { // Speculative branch became inconsistent. So we don't // run it again branch.branch_state = BranchState::Failed; }, RunResult::BranchMissingPortState(port_id) => { // Branch called `fires()` on a port that did not have a // value assigned yet. So branch and keep running debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); debug_assert!(branch.port_mapping.get(&port_id).is_none()); let copied_index = Self::duplicate_branch(desc, branch_index); // Need to re-borrow to assign changed port state let original_branch = &mut desc.branches[branch_index as usize]; original_branch.port_mapping.insert(port_id, BranchPortDesc{ last_registered_identifier: None, num_times_fired: 0, }); let copied_branch = &mut desc.branches[copied_index as usize]; copied_branch.port_mapping.insert(port_id, BranchPortDesc{ last_registered_identifier: None, num_times_fired: 1, }); // Run both again desc.spec_branches_active.push_back(branch_index); desc.spec_branches_active.push_back(copied_index); }, RunResult::BranchMissingPortValue(port_id) => { // Branch just performed a `get()` on a port that did // not yet receive a value. // First check if a port value is assigned to the // current branch. If so, check if it is consistent. debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); match branch.port_mapping.entry(port_id) { Entry::Vacant(entry) => { // No entry yet, so force to firing entry.insert(BranchPortDesc{ last_registered_identifier: None, num_times_fired: 1, }); branch.branch_state = BranchState::BranchPoint; desc.spec_branches_pending_receive.insert(port_id, branch_index); }, Entry::Occupied(entry) => { // Have an entry, check if it is consistent let entry = entry.get(); if entry.num_times_fired == 0 { // Inconsistent branch.branch_state = BranchState::Failed; } else { // Perfectly fine, add to queue branch.branch_state = BranchState::BranchPoint; desc.spec_branches_pending_receive.insert(port_id, branch_index); } } } }, RunResult::BranchAtSyncEnd => { branch.branch_state = BranchState::ReachedEndSync; todo!("somehow propose solution"); }, RunResult::BranchPut(port_id, value_group) => { debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); debug_assert_eq!(value_group.values.len(), 1) }, _ => unreachable!("got result '{:?}' from running component in sync mode", run_result), } } else { // Running in non-synchronous mode let branch = &mut desc.branches[0]; let run_result = branch.code_state.run(&mut run_context, &self.protocol); match run_result { RunResult::ComponentTerminated => return false, RunResult::ComponentAtSyncStart => { // Prepare for sync execution Self::prepare_branch_for_sync(desc); call_again = true; }, RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { // Generate a new connector with its own state let new_component_id = self.generate_connector_id(); let new_component_state = ComponentState { prompt: Prompt::new(&self.protocol.types, &self.protocol.heap, definition_id, monomorph_idx, arguments) }; // Transfer the ownership of any ports to the new connector let mut ports = Vec::with_capacity(arguments.values.len()); find_ports_in_value_group(&arguments, &mut ports); for port_id in &ports { let port = self.registry.ports.get_mut(&port_id.0.u32_suffix).unwrap(); debug_assert_eq!(port.owning_connector_id.unwrap(), run_context.connector_id); port.owning_connector_id = Some(new_component_id) } // Finally push the new connector into the registry let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect(); self.registry.connectors.insert(new_component_id, ConnectorDesc::new(new_component_id, new_component_state, ports)); self.connectors_active.push_back(new_component_id); }, RunResult::NewChannel => { // Prepare channel debug_assert!(run_context.pending_channel.is_none()); let (put_id, get_id) = self.registry.add_channel(Some(run_context.connector_id)); run_context.pending_channel = Some(( port_value_from_id(Some(run_context.connector_id), put_id, true), port_value_from_id(Some(run_context.connector_id), get_id, false) )); // Call again so it is retrieved from the context call_again = true; }, _ => unreachable!("got result '{:?}' from running component in non-sync mode", run_result), } } } return true; } fn generate_connector_id(&mut self) -> u32 { let id = self.registry.connector_counter; self.registry.connector_counter += 1; return id; } // ------------------------------------------------------------------------- // Helpers for branch management // ------------------------------------------------------------------------- /// Prepares a speculative branch for further execution from the connector's /// non-speculative base branch. fn prepare_branch_for_sync(desc: &mut ConnectorDesc) { // Ensure only one branch is active, the non-sync branch debug_assert!(!desc.in_sync); debug_assert_eq!(desc.branches.len(), 1); debug_assert!(desc.spec_branches_active.is_empty()); let new_branch_index = 1; let new_branch_identifier = desc.branch_id_counter; desc.branch_id_counter += 1; // Push first speculative branch as active branch let new_branch = BranchDesc::new_sync_from(new_branch_index, new_branch_identifier, &desc.branches[0]); desc.branches.push(new_branch); desc.spec_branches_active.push_back(new_id); desc.in_sync = true; } /// Duplicates a particular (speculative) branch and returns its index. fn duplicate_branch(desc: &mut ConnectorDesc, original_branch_idx: u32) -> u32 { let original_branch = &desc.branches[original_branch_idx as usize]; debug_assert!(desc.in_sync); let copied_index = desc.branches.len() as u32; let copied_id = desc.branch_id_counter; desc.branch_id_counter += 1; let copied_branch = BranchDesc::new_sync_from(copied_index, copied_id, original_branch); desc.branches.push(copied_branch); return copied_index; } } /// Context accessible by the code while being executed by the runtime. When the /// code is being executed by the runtime it sometimes needs to interact with /// the runtime. This is achieved by the "code throwing an error code", after /// which the runtime modifies the appropriate variables and continues executing /// the code again. struct Context<'a> { // Properties of currently running connector/branch connector_id: u32, branch_id: Option, // Resources ready to be retrieved by running code pending_channel: Option<(Value, Value)>, // (put, get) ports } impl<'a> crate::protocol::RunContext for Context<'a> { fn did_put(&self, port: PortId) -> bool { todo!() } fn get(&self, port: PortId) -> Option { todo!() } fn fires(&self, port: PortId) -> Option { todo!() } fn get_channel(&mut self) -> Option<(Value, Value)> { self.pending_channel.take() } } /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { // Helper to check a value for a port and recurse if needed. fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { match value { Value::Input(port_id) | Value::Output(port_id) => { // This is an actual port for prev_port in ports { if prev_port == port_id { // Already added return; } } ports.push(*port_id); }, Value::Array(heap_pos) | Value::Message(heap_pos) | Value::String(heap_pos) | Value::Struct(heap_pos) | Value::Union(_, heap_pos) => { // Reference to some dynamic thing which might contain ports, // so recurse let heap_region = &group.regions[*heap_pos as usize]; for embedded_value in heap_region { find_port_in_value(group, embedded_value, ports); } }, _ => {}, // values we don't care about } } // Clear the ports, then scan all the available values ports.clear(); for value in &value_group.values { find_port_in_value(value_group, value, ports); } } fn port_value_from_id(connector_id: Option, port_id: u32, is_output: bool) -> Value { let connector_id = connector_id.unwrap_or(u32::MAX); // TODO: @hack, review entire PortId/ConnectorId/Id system if is_output { return Value::Output(PortId(Id{ connector_id, u32_suffix: port_id })); } else { return Value::Input(PortId(Id{ connector_id, u32_suffix: port_id, })); } }