diff --git a/src/protocol/eval/mod.rs b/src/protocol/eval/mod.rs index 0425444e482048420bdc13dd7449620babb0474e..cdb2e418d8644e3d1f26a3ca529c4421441be4e3 100644 --- a/src/protocol/eval/mod.rs +++ b/src/protocol/eval/mod.rs @@ -24,8 +24,7 @@ pub(crate) mod value; pub(crate) mod store; pub(crate) mod executor; pub(crate) mod error; - pub use error::EvalError; -pub use value::{PortId, Value, ValueGroup}; +pub use value::{PortId, Value, ValueId, ValueGroup}; pub use executor::{EvalContinuation, EvalResult, Prompt}; diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index 335f4c6d0864416a0135f0c9522eb433f19f3328..e7b2d674b32c4330a5e07c5cb859aadfa740735e 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -1,4 +1,5 @@ use crate::protocol::eval::*; +use crate::protocol::eval::value::ValueId; use super::runtime::*; use super::component::*; @@ -31,6 +32,7 @@ pub struct DataMessage { pub data_header: MessageDataHeader, pub sync_header: MessageSyncHeader, pub content: ValueGroup, + pub ports: Vec, } #[derive(Debug)] @@ -61,6 +63,14 @@ pub struct MessageDataHeader { pub target_port: PortId, } +#[derive(Debug)] +pub struct TransmittedPort { + location: ValueId, // within `content` + messages: Vec, // owned by previous component + peer_comp: CompId, + peer_port: PortId, +} + // ----------------------------------------------------------------------------- // Sync messages // ----------------------------------------------------------------------------- diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 299c64f158dd53f70f55f17456bcb4a1d7570d8c..2044a5d5017d9723d998127ae9e8f312a689cd0d 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -1,9 +1,10 @@ use std::fmt::{Display as FmtDisplay, Result as FmtResult, Formatter}; -use crate::protocol::eval::{Prompt, EvalError, ValueGroup, PortId as EvalPortId}; +use crate::protocol::eval::{Prompt, EvalError, ValueGroup, Value, ValueId, PortId as EvalPortId}; use crate::protocol::*; use crate::runtime2::*; use crate::runtime2::communication::*; +use crate::runtime2::component::component_pdl::find_ports_in_value_group; use super::{CompCtx, CompPDL, CompId}; use super::component_context::*; @@ -78,6 +79,7 @@ pub(crate) enum CompMode { BlockedGet, // blocked because we need to receive a message on a particular port BlockedPut, // component is blocked because the port is blocked BlockedSelect, // waiting on message to complete the select statement + BlockedPutPorts,// blocked because we're waiting to send a data message containing ports StartExit, // temporary state: if encountered then we start the shutdown process. BusyExit, // temporary state: waiting for Acks for all the closed ports, potentially waiting for sync round to finish Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 @@ -88,7 +90,7 @@ impl CompMode { use CompMode::*; match self { - Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true, + Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | BlockedPutPorts => true, NonSync | StartExit | BusyExit | Exit => false, } } @@ -97,7 +99,7 @@ impl CompMode { use CompMode::*; match self { - NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => false, + NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | BlockedPutPorts => false, StartExit | BusyExit => true, Exit => false, } @@ -256,6 +258,30 @@ pub(crate) fn default_send_data_message( return Ok(CompScheduling::Sleep); } else { + // Check if there are any ports that are being transmitted + let mut ports = Vec::new(); + find_ports_in_value_group(&value, &mut ports); + if !ports.is_empty() { + + + for (value_location, port_id) in ports { + let transmitted_port_handle = comp_ctx.get_port_handle(port_id); + let transmitted_port = comp_ctx.get_port(transmitted_port_handle); + + if transmitted_port.state.is_set(PortStateFlag::Transmitted) { + // Note: We could also attach where the port has been + // transferred + return Err(( + port_info.last_instruction, + String::from("Cannot send this message, as it contains a previously transmitted port") + )); + } + + // Prepare ack for PPC + // Prepare PPC message + } + } + // Port is not blocked, so send to the peer let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); @@ -800,6 +826,10 @@ fn default_handle_recently_unblocked_port( debug_assert!(!port_info.state.is_blocked()); // should have been done by the caller if exec_state.is_blocked_on_put(port_id) { + // Return to the regular execution mode + exec_state.mode = CompMode::Sync; + exec_state.mode_port = PortId::new_invalid(); + // Annotate the message that we're going to send let port_info = comp_ctx.get_port(port_handle); // for immutable access debug_assert_eq!(port_info.kind, PortKind::Putter); @@ -825,3 +855,45 @@ pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId { pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId { return EvalPortId{ id: port_id.0 }; } + +/// Recursively goes through the value group, attempting to find ports. +/// Duplicates will only be added once. +pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<(ValueId, PortId)>) { + // Helper to check a value for a port and recurse if needed. + fn find_port_in_value(group: &ValueGroup, value: &Value, value_location: ValueId, ports: &mut Vec<(ValueId, PortId)>) { + match value { + Value::Input(port_id) | Value::Output(port_id) => { + // This is an actual port + let cur_port = PortId(port_id.id); + for prev_port in ports.iter() { + if *prev_port == cur_port.1 { + // Already added + return; + } + } + + ports.push((value_location, cur_port)); + }, + Value::Array(heap_pos) | + Value::Message(heap_pos) | + Value::String(heap_pos) | + Value::Struct(heap_pos) | + Value::Union(_, heap_pos) => { + // Reference to some dynamic thing which might contain ports, + // so recurse + let heap_region = &group.regions[*heap_pos as usize]; + for (value_index, embedded_value) in heap_region.iter().enumerate() { + let value_location = ValueId::Heap(*heap_pos, value_index as u32); + find_port_in_value(group, embedded_value, value_location, ports); + } + }, + _ => {}, // values we don't care about + } + } + + // Clear the ports, then scan all the available values + ports.clear(); + for (value_index, value) in &value_group.values.iter().enumerate() { + find_port_in_value(value_group, value, ValueId::Stack(value_index as u32), ports); + } +} \ No newline at end of file diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs index b2c3435d5606f217b687597005aec6e3b4e3a262..3a72bdffe37e35420b89c72d12824a8a517d5ea1 100644 --- a/src/runtime2/component/component_context.rs +++ b/src/runtime2/component/component_context.rs @@ -29,6 +29,7 @@ pub enum PortStateFlag { Closed = 0x01, // If not closed, then the port is open BlockedDueToPeerChange = 0x02, // busy changing peers, hence use of port is temporarily blocked BlockedDueToFullBuffers = 0x04, + Transmitted, // Transmitted, so cannot be used anymore } #[derive(Copy, Clone)] @@ -307,11 +308,6 @@ impl CompCtx { // Local utilities // ------------------------------------------------------------------------- - #[inline] - fn requires_peer_reference(port: &Port, self_id: CompId, required_if_closed: bool) -> bool { - return (!port.state.is_closed() || required_if_closed) && port.peer_comp_id != self_id; - } - fn must_get_port_index(&self, handle: LocalPortHandle) -> usize { for (index, port) in self.ports.iter().enumerate() { if port.self_id == handle.0 { diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index ed94ded96a5f9d0277ae3ffe542378e49cf3b5f3..87aa4e0183237eb260948b5442ad319e291f09ed 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -774,47 +774,6 @@ impl CompPDL { } } -/// Recursively goes through the value group, attempting to find ports. -/// Duplicates will only be added once. -pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { - // Helper to check a value for a port and recurse if needed. - fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { - match value { - Value::Input(port_id) | Value::Output(port_id) => { - // This is an actual port - let cur_port = PortId(port_id.id); - for prev_port in ports.iter() { - if *prev_port == cur_port { - // Already added - return; - } - } - - ports.push(cur_port); - }, - Value::Array(heap_pos) | - Value::Message(heap_pos) | - Value::String(heap_pos) | - Value::Struct(heap_pos) | - Value::Union(_, heap_pos) => { - // Reference to some dynamic thing which might contain ports, - // so recurse - let heap_region = &group.regions[*heap_pos as usize]; - for embedded_value in heap_region { - find_port_in_value(group, embedded_value, ports); - } - }, - _ => {}, // values we don't care about - } - } - - // Clear the ports, then scan all the available values - ports.clear(); - for value in &value_group.values { - find_port_in_value(value_group, value, ports); - } -} - struct ValueGroupPortIter<'a> { group: &'a mut ValueGroup, heap_stack: Vec<(usize, usize)>,