diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs new file mode 100644 index 0000000000000000000000000000000000000000..18443c8a379db1d69ca5027a5f4952ea775ec426 --- /dev/null +++ b/src/runtime2/connector.rs @@ -0,0 +1,693 @@ +use std::collections::HashMap; + +use super::messages::{Message, Inbox}; + +use crate::protocol::{ComponentState, RunContext, RunResult}; +use crate::{PortId, ProtocolDescription}; +use crate::protocol::eval::{ValueGroup, Value}; + +#[derive(Clone, Copy, PartialEq, Eq)] +pub(crate) struct PortIdLocal { + pub id: u32, +} + +impl PortIdLocal { + pub fn new(id: u32) -> Self { + Self{ id } + } + + // TODO: Unsure about this, maybe remove, then also remove all struct + // instances where I call this + pub fn new_invalid() -> Self { + Self{ id: u32::MAX } + } +} + +/// Represents the identifier of a branch (the index within its container). An +/// ID of `0` generally means "no branch" (e.g. no parent, or a port did not +/// yet receive anything from any branch). +#[derive(Clone, Copy, PartialEq, Eq)] +pub(crate) struct BranchId { + pub index: u32, +} + +impl BranchId { + fn new_invalid() -> Self { + Self{ index: 0 } + } + + fn new(index: u32) -> Self { + debug_assert!(index != 0); + Self{ index } + } + + #[inline] + fn is_valid(&self) -> bool { + return self.index != 0; + } +} + +#[derive(PartialEq, Eq)] +pub(crate) enum SpeculativeState { + // Non-synchronous variants + RunningNonSync, // regular execution of code + Error, // encountered a runtime error + Finished, // finished executing connector's code + // Synchronous variants + RunningInSync, // running within a sync block + HaltedAtBranchPoint, // at a branching point (at a `get` call) + ReachedSyncEnd, // reached end of sync block, branch represents a local solution + Inconsistent, // branch can never represent a local solution, so halted +} + +pub(crate) struct Branch { + index: BranchId, + parent_index: BranchId, + // Code execution state + code_state: ComponentState, + sync_state: SpeculativeState, + next_branch_in_queue: Option, + // Message/port state + inbox: HashMap, // TODO: @temporary, remove together with fires() + ports_delta: Vec, +} + +impl Branch { + /// Constructs a non-sync branch. It is assumed that the code is at the + /// first instruction + fn new_initial_branch(component_state: ComponentState) -> Self { + Branch{ + index: BranchId::new_invalid(), + parent_index: BranchId::new_invalid(), + code_state: component_state, + sync_state: SpeculativeState::RunningNonSync, + next_branch_in_queue: None, + inbox: HashMap::new(), + ports_delta: Vec::new(), + } + } + + /// Constructs a sync branch. The provided branch is assumed to be the + /// parent of the new branch within the execution tree. + fn new_sync_branching_from(new_index: u32, parent_branch: &Branch) -> Self { + debug_assert!( + (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) || + (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint) + ); + + Branch{ + index: BranchId::new(new_index), + parent_index: parent_branch.index, + code_state: parent_branch.code_state.clone(), + sync_state: SpeculativeState::RunningInSync, + next_branch_in_queue: None, + inbox: parent_branch.inbox.clone(), + ports_delta: parent_branch.ports_delta.clone(), + } + } +} + +#[derive(Clone)] +struct PortAssignment { + is_assigned: bool, + last_registered_branch_id: BranchId, // invalid branch ID implies not assigned yet + num_times_fired: u32, +} + +impl PortAssignment { + fn new_unassigned() -> Self { + Self{ + is_assigned: false, + last_registered_branch_id: BranchId::new_invalid(), + num_times_fired: 0, + } + } + + #[inline] + fn mark_speculative(&mut self, num_times_fired: u32) { + debug_assert!(!self.last_registered_branch_id.is_valid()); + self.is_assigned = true; + self.num_times_fired = num_times_fired; + } + + #[inline] + fn mark_definitive(&mut self, branch_id: BranchId, num_times_fired: u32) { + self.is_assigned = true; + self.last_registered_branch_id = branch_id; + self.num_times_fired = num_times_fired; + } +} + +#[derive(Clone, Eq)] +enum PortOwnershipDelta { + TakeOwnership(PortIdLocal), + GiveAwayOwnership(PortIdLocal), +} + +enum PortOwnershipError { + UsedInInteraction(PortIdLocal), + AlreadyGivenAway(PortIdLocal) +} + +/// As the name implies, this contains a description of the ports associated +/// with a connector. +/// TODO: Extend documentation +struct ConnectorPorts { + // Essentially a mapping from `port_index` to `port_id`. + owned_ports: Vec, + // Contains P*B entries, where P is the number of ports and B is the number + // of branches. One can find the appropriate mapping of port p at branch b + // at linear index `b*P+p`. + port_mapping: Vec +} + +impl ConnectorPorts { + /// Constructs the initial ports object. Assumes the presence of the + /// non-sync branch at index 0. Will initialize all entries for the non-sync + /// branch. + fn new(owned_ports: Vec) -> Self { + let num_ports = owned_ports.len(); + let mut port_mapping = Vec::with_capacity(num_ports); + for _ in 0..num_ports { + port_mapping.push(PortAssignment::new_unassigned()); + } + + Self{ owned_ports, port_mapping } + } + + /// Prepares the port mapping for a new branch. Assumes that there is no + /// intermediate branch index that we have skipped. + fn prepare_sync_branch(&mut self, parent_branch_idx: u32, new_branch_idx: u32) { + let num_ports = self.owned_ports.len(); + let parent_base_idx = parent_branch_idx as usize * num_ports; + let new_base_idx = new_branch_idx as usize * num_ports; + + debug_assert!(parent_branch_idx < new_branch_idx); + debug_assert!(new_base_idx == self.port_mapping.len()); + + self.port_mapping.reserve(num_ports); + for offset in 0..num_ports { + let parent_port = &self.port_mapping[parent_base_idx + offset]; + self.port_mapping.push(parent_port.clone()); + } + } + + /// Removes a particular port from the connector. May only be done if the + /// connector is in non-sync mode + fn remove_port(&mut self, port_id: PortIdLocal) { + debug_assert!(self.port_mapping.len() == self.owned_ports.len()); // in non-sync mode + let port_index = self.get_port_index(port_id).unwrap(); + self.owned_ports.remove(port_index); + self.port_mapping.remove(port_index); + } + + /// Retrieves the index associated with a port id. Note that the port might + /// not exist (yet) if a speculative branch has just received the port. + /// TODO: But then again, one cannot use that port, right? + #[inline] + fn get_port_index(&self, port_id: PortIdLocal) -> Option { + for (idx, port) in self.owned_ports.iter().enumerate() { + if port == port_id { + return Some(idx) + } + } + + return None + } + + #[inline] + fn get_port(&self, branch_idx: u32, port_idx: usize) -> &PortAssignment { + let mapped_idx = self.mapped_index(branch_idx, port_idx); + return &self.port_mapping[mapped_idx]; + } + + #[inline] + fn get_port_mut(&mut self, branch_idx: u32, port_idx: usize) -> &mut PortAssignment { + let mapped_idx = self.mapped_index(branch_idx, port_idx); + return &mut self.port_mapping(mapped_idx); + } + + fn num_ports(&self) -> usize { + return self.owned_ports.len(); + } + + + // Function for internal use: retrieve index in flattened port mapping array + // based on branch/port index. + #[inline] + fn mapped_index(&self, branch_idx: u32, port_idx: usize) -> usize { + let branch_idx = branch_idx as usize; + let num_ports = self.owned_ports.len(); + + debug_assert!(port_idx < num_ports); + debug_assert!((branch_idx + 1) * num_ports <= self.port_mapping.len()); + + return branch_idx * num_ports + port_idx; + } +} + +struct BranchQueue { + first: u32, + last: u32, +} + +impl BranchQueue { + fn new() -> Self { + Self{ first: 0, last: 0 } + } + + fn is_empty(&self) -> bool { + debug_assert!((self.first == 0) == (self.last == 0)); + return self.first == 0; + } +} + +pub(crate) struct Connector { + // State and properties of connector itself + id: u32, + in_sync: bool, + // Branch management + branches: Vec, // first branch is always non-speculative one + sync_active: BranchQueue, + sync_pending_get: BranchQueue, + sync_finished: BranchQueue, + // Port/message management + ports: ConnectorPorts, + inbox: Inbox, +} + +struct TempCtx {} +impl RunContext for TempCtx { + fn did_put(&mut self, port: PortId) -> bool { + todo!() + } + + fn get(&mut self, port: PortId) -> Option { + todo!() + } + + fn fires(&mut self, port: PortId) -> Option { + todo!() + } + + fn get_channel(&mut self) -> Option<(Value, Value)> { + todo!() + } +} + +impl Connector { + /// Constructs a representation of a connector. The assumption is that the + /// initial branch is at the first instruction of the connector's code, + /// hence is in a non-sync state. + pub fn new(id: u32, initial_branch: Branch, owned_ports: Vec) -> Self { + Self{ + id, + in_sync: false, + branches: vec![initial_branch], + sync_active: BranchQueue::new(), + sync_pending_get: BranchQueue::new(), + sync_finished: BranchQueue::new(), + ports: ConnectorPorts::new(owned_ports), + inbox: Inbox::new(), + } + } + + /// Runs the connector in synchronous mode. Potential changes to the global + /// system's state are added to the `RunDeltaState` object by the connector, + /// where it is the caller's responsibility to immediately take care of + /// those changes. The return value indicates when (and if) the connector + /// needs to be scheduled again. + pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling { + debug_assert!(self.in_sync); + debug_assert!(!self.sync_active.is_empty()); + + let branch = Self::pop_branch(&mut self.branches, &mut self.sync_active); + + // Run the branch to the next blocking point + let mut run_context = TempCtx{}; + let run_result = branch.code_state.run(&mut run_context, pd); + + // Match statement contains `return` statements only if the particular + // run result behind handled requires an immediate re-run of the + // connector. + match run_result { + RunResult::BranchInconsistent => { + // Speculative branch became inconsistent + branch.sync_state = SpeculativeState::Inconsistent; + }, + RunResult::BranchMissingPortState(port_id) => { + // Branch called `fires()` on a port that does not yet have an + // assigned speculative value. So we need to create those + // branches + let local_port_id = PortIdLocal::new(port_id.0.u32_suffix); + let local_port_index = self.ports.get_port_index(local_port_id).unwrap(); + + debug_assert!(self.ports.owned_ports.contains(&local_port_id)); + let silent_branch = &*branch; + + // Create a copied branch who will have the port set to firing + let firing_index = self.branches.len() as u32; + let mut firing_branch = Branch::new_sync_branching_from(firing_index, silent_branch); + self.ports.prepare_sync_branch(branch.index.index, firing_index); + + let firing_port = self.ports.get_port_mut(firing_index, local_port_index); + firing_port.mark_speculative(1); + + // Assign the old branch a silent value + let silent_port = self.ports.get_port_mut(silent_branch.index.index, local_port_index); + silent_port.mark_speculative(0); + + // Run both branches again + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, silent_branch.index); + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, firing_branch.index); + self.branches.push(firing_branch); + + return ConnectorScheduling::Immediate; + }, + RunResult::BranchMissingPortValue(port_id) => { + // Branch performed a `get` on a port that has not yet received + // a value in its inbox. + let local_port_id = PortIdLocal::new(port_id.0.u32_suffix); + let local_port_index = self.ports.get_port_index(local_port_id).unwrap(); + let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index); + + // Check for port mapping assignment and, if present, if it is + // consistent + let is_valid_get = if port_mapping.is_assigned { + assert!(port_mapping.num_times_fired <= 1); // temporary, until we get rid of `fires` + port_mapping.num_times_fired == 1 + } else { + // Not yet assigned + port_mapping.mark_speculative(1); + true + }; + + if is_valid_get { + // Mark as a branching point for future messages + branch.sync_state = SpeculativeState::HaltedAtBranchPoint; + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch.index); + + // But if some messages can be immediately applied, do so + // now. + let messages = self.inbox.get_messages(local_port_id, port_mapping.last_registered_branch_id); + if !messages.is_empty() { + // TODO: If message contains ports, transfer ownership of port. + for message in messages { + // For each message, for the execution and feed it + // the provided message + let new_branch_index = self.branches.len() as u32; + let mut new_branch = Branch::new_sync_branching_from(new_branch_index, branch); + self.ports.prepare_sync_branch(branch.index.index, new_branch_index); + + let port_mapping = self.ports.get_port_mut(new_branch_index, local_port_index); + port_mapping.last_registered_branch_id = message.sender_cur_branch_id; + debug_assert!(port_mapping.is_assigned && port_mapping.num_times_fired == 1); + + new_branch.inbox.insert(local_port_id, message.clone()); + + // Schedule the new branch + debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync); + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch.index); + self.branches.push(new_branch); + } + + // Because we have new branches to run, schedule + // immediately + return ConnectorScheduling::Immediate; + } + } else { + branch.sync_state = SpeculativeState::Inconsistent; + } + }, + RunResult::BranchAtSyncEnd => { + // Branch is done, go through all of the ports that are not yet + // assigned and modify them to be + for port_idx in 0..self.ports.num_ports() { + let port_mapping = self.ports.get_port_mut(branch.index.index, port_idx); + if !port_mapping.is_assigned { + port_mapping.mark_speculative(0); + } + } + + branch.sync_state = SpeculativeState::ReachedSyncEnd; + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_finished, branch.index); + }, + RunResult::BranchPut(port_id, value_group) => { + // Branch performed a `put` on a particualar port. + let local_port_id = PortIdLocal{ id: port_id.0.u32_suffix }; + let local_port_index = self.ports.get_port_index(local_port_id).unwrap(); + + // Check the port mapping for consistency + // TODO: For now we can only put once, so that simplifies stuff + let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index); + let is_valid_put = if port_mapping.is_assigned { + // Already assigned, so must be speculative and one time + // firing, otherwise we are `put`ing multiple times. + if port_mapping.last_registered_branch_id.is_valid() { + // Already did a `put` + todo!("handle error through RunDeltaState"); + } else { + // Valid if speculatively firing + port_mapping.num_times_fired == 1 + } + } else { + // Not yet assigned, do so now + true + }; + + if is_valid_put { + // Put in run results for thread to pick up and transfer to + // the correct connector inbox. + port_mapping.mark_definitive(branch.index, 1); + let message = Message{ + sending_port: local_port_id, + receiving_port: PortIdLocal::new_invalid(), + sender_prev_branch_id: BranchId::new_invalid(), + sender_cur_branch_id: branch.index, + message: value_group, + }; + + results.outbox.push(message); + return ConnectorScheduling::Immediate + } else { + branch.sync_state = SpeculativeState::Inconsistent; + } + }, + _ => unreachable!("unexpected run result '{:?}' while running in sync mode", run_result), + } + + // Not immediately scheduling, so schedule again if there are more + // branches to run + if self.sync_active.is_empty() { + return ConnectorScheduling::NotNow; + } else { + return ConnectorScheduling::Later; + } + } + + /// Runs the connector in non-synchronous mode. + fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling { + debug_assert!(!self.in_sync); + debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty()); + debug_assert!(self.branches.len() == 1); + + let branch = &mut self.branches[0]; + debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync); + + let mut run_context = TempCtx{}; + let run_result = branch.code_state.run(&mut run_context, pd); + + match run_result { + RunResult::ComponentTerminated => { + // Need to wait until all children are terminated + // TODO: Think about how to do this? + branch.sync_state = SpeculativeState::Finished; + return ConnectorScheduling::NotNow; + }, + RunResult::ComponentAtSyncStart => { + // Prepare for sync execution and reschedule immediately + self.in_sync = true; + let first_sync_branch = Branch::new_sync_branching_from(1, branch); + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch.index); + self.branches.push(first_sync_branch); + + return ConnectorScheduling::Later; + }, + RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { + // Construction of a new component. Find all references to ports + // inside of the arguments + let first_port_idx = results.ports.len(); + find_ports_in_value_group(&arguments, &mut results.ports); + + for port + } + } + + ConnectorScheduling::NotNow // TODO: @Temp + } + + // ------------------------------------------------------------------------- + // Internal helpers + // ------------------------------------------------------------------------- + + // Helpers for management of the branches and their internally stored + // `next_branch_in_queue` and the `BranchQueue` objects. Essentially forming + // linked lists inside of the vector of branches. + + #[inline] + fn pop_branch(branches: &mut Vec, queue: &mut BranchQueue) -> &mut Branch { + debug_assert!(queue.first != 0); + let branch = &mut branches[queue.first as usize]; + *queue.first = branch.next_branch_in_queue.unwrap_or(0); + branch.next_branch_in_queue = None; + + if *queue.first == 0 { + // No more entries in queue + debug_assert_eq!(*queue.last, branch.index.index); + *queue.last = 0; + } + + return branch; + } + + #[inline] + fn push_branch_into_queue(branches: &mut Vec, queue: &mut BranchQueue, to_push: BranchId) { + debug_assert!(to_push.is_valid()); + let to_push = to_push.index; + + if *queue.last == 0 { + // No branches in the queue at all + debug_assert_eq!(*queue.first, 0); + branches[to_push as usize].next_branch_in_queue = None; + *queue.first = to_push; + *queue.last = to_push; + } else { + // Pre-existing branch in the queue + debug_assert_ne!(*queue.first, 0); + branches[*queue.last as usize].next_branch_in_queue = Some(to_push); + *queue.last = to_push; + } + } + + // Helpers for local port management. Specifically for adopting/losing + // ownership over ports + + /// Marks the ports as being "given away" (e.g. by sending a message over a + /// channel, or by constructing a connector). Will return an error if the + /// connector doesn't own the port in the first place. + fn give_away_ports(ports: &mut ConnectorPorts, in_sync: bool, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { + debug_assert!(in_sync == !branch.index.is_valid()); + + for port_id in port_ids { + match ports.get_port_index(*port_id) { + Some(port_index) => { + // We (used to) own the port + let port_mapping = ports.get_port(branch.index.index, port_index); + if port_mapping.is_assigned { + // Port is used in some kind of interaction. Cannot both + // give away the port and use it in an interaction + return Err(PortOwnershipError::UsedInInteraction(*port_id)) + } + + // Make sure it is not already given away + for delta in &branch.ports_delta { + match delta { + PortOwnershipDelta::TakeOwnership(_) => unreachable!(), // because we had a port mapping + PortOwnershipDelta::GiveAwayOwnership(given_away_port_id) => { + if port_id == given_away_port_id { + return Err(PortOwnershipError::AlreadyGivenAway(*port_id)); + } + } + } + } + + // We're fine, the port will be given away. Note that if we + // are not in sync mode, then we can simply remove the + // ownership immediately. + if in_sync { + branch.ports_delta.push(PortOwnershipDelta::GiveAwayOwnership(*port_id)); + } else { + + } + }, + None => { + // We did not yet own the port, so we must have received it + // this round, and we're going to give it away again. + debug_assert!(branch.ports_delta.contains(&PortOwnershipDelta::TakeOwnership(*port_id))); + let delta_to_find = PortOwnershipDelta::TakeOwnership(*port_id); + for delta_idx in 0..branch.ports_delta.len() { + if branch.ports_delta[delta_idx] == delta_to_find { + branch.ports_delta.remove(delta_idx); + break; + } + } + + // Note for programmers: the fact that the message that + // contains this port will end up at another connector will + // take care of its new ownership. + } + } + } + + return Ok(()); + } + + /// Adopt ownership of the ports +} + +/// A data structure passed to a connector whose code is being executed that is +/// used to queue up various state changes that have to be applied after +/// running, e.g. the messages the have to be transferred to other connectors. +// TODO: Come up with a better name +struct RunDeltaState { + outbox: Vec, + ports: Vec, +} + +enum ConnectorScheduling { + Immediate, // Run again, immediately + Later, // Schedule for running, at some later point in time + NotNow, // Do not reschedule for running +} + + +/// Recursively goes through the value group, attempting to find ports. +/// Duplicates will only be added once. +fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { + // Helper to check a value for a port and recurse if needed. + fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { + match value { + Value::Input(port_id) | Value::Output(port_id) => { + // This is an actual port + let cur_port = PortIdLocal::new(port_id.0.u32_suffix); + for prev_port in ports.iter() { + if prev_port == cur_port { + // Already added + return; + } + } + + ports.push(cur_port); + }, + Value::Array(heap_pos) | + Value::Message(heap_pos) | + Value::String(heap_pos) | + Value::Struct(heap_pos) | + Value::Union(_, heap_pos) => { + // Reference to some dynamic thing which might contain ports, + // so recurse + let heap_region = &group.regions[*heap_pos as usize]; + for embedded_value in heap_region { + find_port_in_value(group, embedded_value, ports); + } + }, + _ => {}, // values we don't care about + } + } + + // Clear the ports, then scan all the available values + ports.clear(); + for value in &value_group.values { + find_port_in_value(value_group, value, ports); + } +} \ No newline at end of file