diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 299c64f158dd53f70f55f17456bcb4a1d7570d8c..2044a5d5017d9723d998127ae9e8f312a689cd0d 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -1,9 +1,10 @@ use std::fmt::{Display as FmtDisplay, Result as FmtResult, Formatter}; -use crate::protocol::eval::{Prompt, EvalError, ValueGroup, PortId as EvalPortId}; +use crate::protocol::eval::{Prompt, EvalError, ValueGroup, Value, ValueId, PortId as EvalPortId}; use crate::protocol::*; use crate::runtime2::*; use crate::runtime2::communication::*; +use crate::runtime2::component::component_pdl::find_ports_in_value_group; use super::{CompCtx, CompPDL, CompId}; use super::component_context::*; @@ -78,6 +79,7 @@ pub(crate) enum CompMode { BlockedGet, // blocked because we need to receive a message on a particular port BlockedPut, // component is blocked because the port is blocked BlockedSelect, // waiting on message to complete the select statement + BlockedPutPorts,// blocked because we're waiting to send a data message containing ports StartExit, // temporary state: if encountered then we start the shutdown process. BusyExit, // temporary state: waiting for Acks for all the closed ports, potentially waiting for sync round to finish Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 @@ -88,7 +90,7 @@ impl CompMode { use CompMode::*; match self { - Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true, + Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | BlockedPutPorts => true, NonSync | StartExit | BusyExit | Exit => false, } } @@ -97,7 +99,7 @@ impl CompMode { use CompMode::*; match self { - NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => false, + NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | BlockedPutPorts => false, StartExit | BusyExit => true, Exit => false, } @@ -256,6 +258,30 @@ pub(crate) fn default_send_data_message( return Ok(CompScheduling::Sleep); } else { + // Check if there are any ports that are being transmitted + let mut ports = Vec::new(); + find_ports_in_value_group(&value, &mut ports); + if !ports.is_empty() { + + + for (value_location, port_id) in ports { + let transmitted_port_handle = comp_ctx.get_port_handle(port_id); + let transmitted_port = comp_ctx.get_port(transmitted_port_handle); + + if transmitted_port.state.is_set(PortStateFlag::Transmitted) { + // Note: We could also attach where the port has been + // transferred + return Err(( + port_info.last_instruction, + String::from("Cannot send this message, as it contains a previously transmitted port") + )); + } + + // Prepare ack for PPC + // Prepare PPC message + } + } + // Port is not blocked, so send to the peer let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); @@ -800,6 +826,10 @@ fn default_handle_recently_unblocked_port( debug_assert!(!port_info.state.is_blocked()); // should have been done by the caller if exec_state.is_blocked_on_put(port_id) { + // Return to the regular execution mode + exec_state.mode = CompMode::Sync; + exec_state.mode_port = PortId::new_invalid(); + // Annotate the message that we're going to send let port_info = comp_ctx.get_port(port_handle); // for immutable access debug_assert_eq!(port_info.kind, PortKind::Putter); @@ -825,3 +855,45 @@ pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId { pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId { return EvalPortId{ id: port_id.0 }; } + +/// Recursively goes through the value group, attempting to find ports. +/// Duplicates will only be added once. +pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<(ValueId, PortId)>) { + // Helper to check a value for a port and recurse if needed. + fn find_port_in_value(group: &ValueGroup, value: &Value, value_location: ValueId, ports: &mut Vec<(ValueId, PortId)>) { + match value { + Value::Input(port_id) | Value::Output(port_id) => { + // This is an actual port + let cur_port = PortId(port_id.id); + for prev_port in ports.iter() { + if *prev_port == cur_port.1 { + // Already added + return; + } + } + + ports.push((value_location, cur_port)); + }, + Value::Array(heap_pos) | + Value::Message(heap_pos) | + Value::String(heap_pos) | + Value::Struct(heap_pos) | + Value::Union(_, heap_pos) => { + // Reference to some dynamic thing which might contain ports, + // so recurse + let heap_region = &group.regions[*heap_pos as usize]; + for (value_index, embedded_value) in heap_region.iter().enumerate() { + let value_location = ValueId::Heap(*heap_pos, value_index as u32); + find_port_in_value(group, embedded_value, value_location, ports); + } + }, + _ => {}, // values we don't care about + } + } + + // Clear the ports, then scan all the available values + ports.clear(); + for (value_index, value) in &value_group.values.iter().enumerate() { + find_port_in_value(value_group, value, ValueId::Stack(value_index as u32), ports); + } +} \ No newline at end of file