diff --git a/src/protocol/eval/mod.rs b/src/protocol/eval/mod.rs index 0425444e482048420bdc13dd7449620babb0474e..cdb2e418d8644e3d1f26a3ca529c4421441be4e3 100644 --- a/src/protocol/eval/mod.rs +++ b/src/protocol/eval/mod.rs @@ -24,8 +24,7 @@ pub(crate) mod value; pub(crate) mod store; pub(crate) mod executor; pub(crate) mod error; - pub use error::EvalError; -pub use value::{PortId, Value, ValueGroup}; +pub use value::{PortId, Value, ValueId, ValueGroup}; pub use executor::{EvalContinuation, EvalResult, Prompt}; diff --git a/src/protocol/eval/value.rs b/src/protocol/eval/value.rs index c4c5060485d669df5943ff6af48090d89b0f604e..97a9f0e8b47f17bdf8d9a75297b8256b2515bea7 100644 --- a/src/protocol/eval/value.rs +++ b/src/protocol/eval/value.rs @@ -261,6 +261,14 @@ impl ValueGroup { } } + /// Retrieves a mutable reference to the value given its ValueId. + pub(crate) fn get_value_mut(&mut self, id: ValueId) -> &mut Value { + match id { + ValueId::Stack(pos) => return &mut self.values[pos as usize], + ValueId::Heap(heap_pos, pos) => return &mut self.regions[heap_pos as usize][pos as usize], + } + } + fn provide_value(&self, value: &Value, to_store: &mut Store) -> Value { if let Some(from_heap_pos) = value.get_heap_pos() { let from_heap_pos = from_heap_pos as usize; diff --git a/src/protocol/parser/mod.rs b/src/protocol/parser/mod.rs index 8f1500639951df5c37527325b710f7e2f3e88097..b3c7dd4b763992e145130a42b861333a4122ae3a 100644 --- a/src/protocol/parser/mod.rs +++ b/src/protocol/parser/mod.rs @@ -327,7 +327,13 @@ impl Parser { // Make sure directory exists let path = Path::new(&base_path); if !path.exists() { - return Err(format!("std lib root directory '{}' does not exist", base_path)); + let from_env_message = if from_env { + format!(" (retrieved from the environment variable '{}')", REOWOLF_PATH_ENV) + } else { + String::new() + }; + + return Err(format!("std lib root directory '{}'{} does not exist", base_path, from_env_message)); } // Try to load all standard library files. We might need a more unified diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index 335f4c6d0864416a0135f0c9522eb433f19f3328..803db6e48ef4c9ef920d8ffc26b4a86a2d13ad86 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -1,4 +1,5 @@ use crate::protocol::eval::*; +use crate::protocol::eval::value::ValueId; use super::runtime::*; use super::component::*; @@ -31,6 +32,7 @@ pub struct DataMessage { pub data_header: MessageDataHeader, pub sync_header: MessageSyncHeader, pub content: ValueGroup, + pub ports: Vec, } #[derive(Debug)] @@ -61,6 +63,16 @@ pub struct MessageDataHeader { pub target_port: PortId, } +#[derive(Debug)] +pub struct TransmittedPort { + pub locations: Vec, // within `content` + pub messages: Vec, // owned by previous component + pub peer_comp: CompId, + pub peer_port: PortId, + pub kind: PortKind, + pub state: PortState, +} + // ----------------------------------------------------------------------------- // Sync messages // ----------------------------------------------------------------------------- diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 299c64f158dd53f70f55f17456bcb4a1d7570d8c..d5712961e4599ebea7ed0b73bbdf3aa03a2d2c81 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -1,6 +1,11 @@ +/* + * Default toolkit for creating components. Contains handlers for initiating and + * responding to various events. + */ + use std::fmt::{Display as FmtDisplay, Result as FmtResult, Formatter}; -use crate::protocol::eval::{Prompt, EvalError, ValueGroup, PortId as EvalPortId}; +use crate::protocol::eval::{Prompt, EvalError, ValueGroup, Value, ValueId, PortId as EvalPortId}; use crate::protocol::*; use crate::runtime2::*; use crate::runtime2::communication::*; @@ -78,6 +83,10 @@ pub(crate) enum CompMode { BlockedGet, // blocked because we need to receive a message on a particular port BlockedPut, // component is blocked because the port is blocked BlockedSelect, // waiting on message to complete the select statement + PutPortsBlockedTransferredPorts, // sending a message with ports, those sent ports are (partly) blocked + PutPortsBlockedAwaitingAcks, // sent out PPC message for blocking transferred ports, now awaiting Acks + PutPortsBlockedSendingPort, // sending a message with ports, message sent through a still-blocked port + NewComponentBlocked, // waiting until ports are in the appropriate state to create a new component StartExit, // temporary state: if encountered then we start the shutdown process. BusyExit, // temporary state: waiting for Acks for all the closed ports, potentially waiting for sync round to finish Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 @@ -88,8 +97,11 @@ impl CompMode { use CompMode::*; match self { - Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true, - NonSync | StartExit | BusyExit | Exit => false, + Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | + PutPortsBlockedTransferredPorts | + PutPortsBlockedAwaitingAcks | + PutPortsBlockedSendingPort => true, + NonSync | NewComponentBlocked | StartExit | BusyExit | Exit => false, } } @@ -97,7 +109,11 @@ impl CompMode { use CompMode::*; match self { - NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => false, + NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | + PutPortsBlockedTransferredPorts | + PutPortsBlockedAwaitingAcks | + PutPortsBlockedSendingPort | + NewComponentBlocked => false, StartExit | BusyExit => true, Exit => false, } @@ -138,6 +154,7 @@ pub(crate) struct CompExecState { pub mode: CompMode, pub mode_port: PortId, // valid if blocked on a port (put/get) pub mode_value: ValueGroup, // valid if blocked on a put + pub mode_component: (ProcedureDefinitionId, TypeId), pub exit_reason: ExitReason, // valid if in StartExit/BusyExit/Exit mode } @@ -147,6 +164,7 @@ impl CompExecState { mode: CompMode::NonSync, mode_port: PortId::new_invalid(), mode_value: ValueGroup::default(), + mode_component: (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()), exit_reason: ExitReason::Termination, } } @@ -162,23 +180,42 @@ impl CompExecState { debug_assert!(self.mode_value.values.is_empty()); } + pub(crate) fn set_as_create_component_blocked( + &mut self, proc_id: ProcedureDefinitionId, type_id: TypeId, + arguments: ValueGroup + ) { + self.mode = CompMode::NewComponentBlocked; + self.mode_value = arguments; + self.mode_component = (proc_id, type_id); + } + pub(crate) fn is_blocked_on_get(&self, port: PortId) -> bool { return self.mode == CompMode::BlockedGet && self.mode_port == port; } - pub(crate) fn set_as_blocked_put(&mut self, port: PortId, value: ValueGroup) { + pub(crate) fn set_as_blocked_put_without_ports(&mut self, port: PortId, value: ValueGroup) { self.mode = CompMode::BlockedPut; self.mode_port = port; self.mode_value = value; } - pub(crate) fn is_blocked_on_put(&self, port: PortId) -> bool { + pub(crate) fn set_as_blocked_put_with_ports(&mut self, port: PortId, value: ValueGroup) { + self.mode = CompMode::PutPortsBlockedTransferredPorts; + self.mode_port = port; + self.mode_value = value; + } + + pub(crate) fn is_blocked_on_put_without_ports(&self, port: PortId) -> bool { return self.mode == CompMode::BlockedPut && self.mode_port == port; } + + pub(crate) fn is_blocked_on_create_component(&self) -> bool { + return self.mode == CompMode::NewComponentBlocked; + } } // TODO: Replace when implementing port sending. Should probably be incorporated @@ -232,7 +269,8 @@ pub(crate) fn create_component( pub(crate) fn default_send_data_message( exec_state: &mut CompExecState, transmitting_port_id: PortId, port_instruction: PortInstruction, value: ValueGroup, - sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx + sched_ctx: &SchedulerCtx, consensus: &mut Consensus, + control: &mut ControlLayer, comp_ctx: &mut CompCtx ) -> Result { debug_assert_eq!(exec_state.mode, CompMode::Sync); @@ -243,6 +281,9 @@ pub(crate) fn default_send_data_message( let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); + let mut ports = Vec::new(); + find_ports_in_value_group(&value, &mut ports); + if port_info.state.is_closed() { // Note: normally peer is eventually consistent, but if it has shut down // then we can be sure it is consistent (I think?) @@ -250,13 +291,20 @@ pub(crate) fn default_send_data_message( port_info.last_instruction, format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0) )) + } else if !ports.is_empty() { + start_send_message_with_ports( + transmitting_port_id, port_instruction, value, exec_state, + comp_ctx, sched_ctx, control + )?; + + return Ok(CompScheduling::Sleep); } else if port_info.state.is_blocked() { // Port is blocked, so we cannot send - exec_state.set_as_blocked_put(transmitting_port_id, value); + exec_state.set_as_blocked_put_without_ports(transmitting_port_id, value); return Ok(CompScheduling::Sleep); } else { - // Port is not blocked, so send to the peer + // Port is not blocked and no ports to transfer: send to the peer let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); @@ -337,7 +385,7 @@ pub(crate) enum GetResult { /// instruction we're attempting on this port. pub(crate) fn default_attempt_get( exec_state: &mut CompExecState, target_port: PortId, target_port_instruction: PortInstruction, - inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, control: &mut ControlLayer, consensus: &mut Consensus ) -> GetResult { let port_handle = comp_ctx.get_port_handle(target_port); @@ -356,14 +404,15 @@ pub(crate) fn default_attempt_get( if let Some(message) = &inbox_main[port_index] { if consensus.try_receive_data_message(sched_ctx, comp_ctx, message) { // We're allowed to receive this message - let message = inbox_main[port_index].take().unwrap(); + let mut message = inbox_main[port_index].take().unwrap(); debug_assert_eq!(target_port, message.data_header.target_port); // Note: we can still run into an unrecoverable error when actually // receiving this message match default_handle_received_data_message( - target_port, target_port_instruction, inbox_main, inbox_backup, - comp_ctx, sched_ctx, control, + target_port, target_port_instruction, + &mut message, inbox_main, inbox_backup, + comp_ctx, sched_ctx, control, consensus ) { Ok(()) => return GetResult::Received(message), Err(location_and_message) => return GetResult::Error(location_and_message) @@ -389,14 +438,70 @@ pub(crate) fn default_attempt_get( /// of full buffers (hence, will use the control layer to make sure the peer /// will become unblocked). pub(crate) fn default_handle_received_data_message( - targeted_port: PortId, port_instruction: PortInstruction, - inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup, - comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer + targeted_port: PortId, _port_instruction: PortInstruction, message: &mut DataMessage, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, + comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer, + consensus: &mut Consensus ) -> Result<(), (PortInstruction, String)> { let port_handle = comp_ctx.get_port_handle(targeted_port); let port_index = comp_ctx.get_port_index(port_handle); - let slot = &mut inbox_main[port_index]; - debug_assert!(slot.is_none()); // because we've just received from it + debug_assert!(inbox_main[port_index].is_none()); // because we've just received from it + + // If we received any ports, add them to the port tracking and inbox struct. + // Then notify the peers that they can continue sending to this port, but + // now at a new address. + for received_port in &mut message.ports { + // Transfer messages to main/backup inbox + let _new_inbox_index = inbox_main.len(); + if !received_port.messages.is_empty() { + inbox_main.push(Some(received_port.messages.remove(0))); + inbox_backup.extend(received_port.messages.drain(..)); + } else { + inbox_main.push(None); + } + + // Create a new port locally + let mut new_port_state = received_port.state; + new_port_state.set(PortStateFlag::Received); + let new_port_handle = comp_ctx.add_port( + received_port.peer_comp, received_port.peer_port, + received_port.kind, new_port_state + ); + debug_assert_eq!(_new_inbox_index, comp_ctx.get_port_index(new_port_handle)); + comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(received_port.peer_comp)); + let new_port = comp_ctx.get_port(new_port_handle); + + // Add the port tho the consensus + consensus.notify_received_port(_new_inbox_index, new_port_handle, comp_ctx); + + // Replace all references to the port in the received message + for message_location in received_port.locations.iter().copied() { + let value = message.content.get_value_mut(message_location); + + match value { + Value::Input(_) => { + debug_assert_eq!(new_port.kind, PortKind::Getter); + *value = Value::Input(port_id_to_eval(new_port.self_id)); + }, + Value::Output(_) => { + debug_assert_eq!(new_port.kind, PortKind::Putter); + *value = Value::Output(port_id_to_eval(new_port.self_id)); + }, + _ => unreachable!(), + } + } + + // Let the peer know that the port can now be used + let peer_handle = comp_ctx.get_peer_handle(new_port.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + + peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{ + id: ControlId::new_invalid(), + sender_comp_id: comp_ctx.id, + target_port_id: Some(new_port.peer_port_id), + content: ControlMessageContent::PortPeerChangedUnblock(new_port.self_id, comp_ctx.id) + }), true); + } // Modify last-known location where port instruction was retrieved let port_info = comp_ctx.get_port(port_handle); @@ -410,7 +515,7 @@ pub(crate) fn default_handle_received_data_message( // One more message, place it in the slot let message = inbox_backup.remove(message_index); debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we're removing another message from the backup - *slot = Some(message); + inbox_main[port_index] = Some(message); return Ok(()); } @@ -437,11 +542,12 @@ pub(crate) fn default_handle_received_data_message( /// state may all change. pub(crate) fn default_handle_control_message( exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus, - message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx + message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) -> Result<(), (PortInstruction, String)> { match message.content { ControlMessageContent::Ack => { - default_handle_ack(control, message.id, sched_ctx, comp_ctx); + default_handle_ack(exec_state, control, message.id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup)?; }, ControlMessageContent::BlockPort => { // One of our messages was accepted, but the port should be @@ -474,7 +580,7 @@ pub(crate) fn default_handle_control_message( // The two components (sender and this component) are closing // the channel at the same time. So we don't care about the // content of the `ClosePort` message. - default_handle_ack(control, control_id, sched_ctx, comp_ctx); + default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup)?; } else { // Respond to the message let port_info = comp_ctx.get_port(port_handle); @@ -495,7 +601,7 @@ pub(crate) fn default_handle_control_message( if exec_state.mode.is_in_sync_block() { let closed_during_sync_round = content.closed_in_sync_round && port_was_used; - let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message; + let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message && port_was_used; if closed_during_sync_round || closed_before_sync_round { return Err(( @@ -519,7 +625,10 @@ pub(crate) fn default_handle_control_message( debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers)); port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers); - default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx); + default_handle_recently_unblocked_port( + exec_state, control, consensus, port_handle, sched_ctx, + comp_ctx, inbox_main, inbox_backup + )?; }, ControlMessageContent::PortPeerChangedBlock => { // The peer of our port has just changed. So we are asked to @@ -541,14 +650,16 @@ pub(crate) fn default_handle_control_message( let port_handle = comp_ctx.get_port_handle(port_to_change); let port_info = comp_ctx.get_port(port_handle); debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToPeerChange)); - let old_peer_id = port_info.peer_comp_id; let port_info = comp_ctx.get_port_mut(port_handle); port_info.peer_port_id = new_port_id; port_info.state.clear(PortStateFlag::BlockedDueToPeerChange); comp_ctx.change_port_peer(sched_ctx, port_handle, Some(new_comp_id)); - default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx); + default_handle_recently_unblocked_port( + exec_state, control, consensus, port_handle, sched_ctx, + comp_ctx, inbox_main, inbox_backup + )?; } } @@ -603,6 +714,13 @@ pub(crate) fn default_handle_start_exit( sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus ) -> CompScheduling { debug_assert_eq!(exec_state.mode, CompMode::StartExit); + for port_index in 0..comp_ctx.num_ports() { + let port_info = comp_ctx.get_port_by_index_mut(port_index); + if port_info.state.is_blocked() { + return CompScheduling::Sleep; + } + } + sched_ctx.info(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason)); exec_state.mode = CompMode::BusyExit; let exit_inside_sync = exec_state.exit_reason.is_in_sync(); @@ -617,7 +735,8 @@ pub(crate) fn default_handle_start_exit( // Iterating over ports by index to work around borrowing rules for port_index in 0..comp_ctx.num_ports() { let port = comp_ctx.get_port_by_index_mut(port_index); - if port.state.is_closed() || port.close_at_sync_end { + println!("DEBUG: Considering port:\n{:?}", port); + if port.state.is_closed() || port.state.is_set(PortStateFlag::Transmitted) || port.close_at_sync_end { // Already closed, or in the process of being closed continue; } @@ -691,6 +810,7 @@ pub(crate) fn default_handle_sync_decision( if port_info.close_at_sync_end { port_info.state.set(PortStateFlag::Closed); } + port_info.state.clear(PortStateFlag::Received); } debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); exec_state.mode = CompMode::NonSync; @@ -706,6 +826,240 @@ pub(crate) fn default_handle_sync_decision( } } + +pub(crate) fn default_start_create_component( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, + control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, + definition_id: ProcedureDefinitionId, type_id: TypeId, arguments: ValueGroup +) { + debug_assert_eq!(exec_state.mode, CompMode::NonSync); + + let mut transferred_ports = Vec::new(); + find_ports_in_value_group(&arguments, &mut transferred_ports); + + // Set execution state as waiting until we can create the component. If we + // can do so right away, then we will. + exec_state.set_as_create_component_blocked(definition_id, type_id, arguments); + if ports_not_blocked(comp_ctx, &transferred_ports) { + perform_create_component(exec_state, sched_ctx, comp_ctx, control, inbox_main, inbox_backup); + } +} + +/// Actually creates a component (and assumes that the caller made sure that +/// none of the ports are involved in a blocking operation). +pub(crate) fn perform_create_component( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, instantiator_ctx: &mut CompCtx, + control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup +) { + // Small internal utilities + struct PortPair { + instantiator_id: PortId, + instantiator_handle: LocalPortHandle, + created_id: PortId, + created_handle: LocalPortHandle, + is_open: bool, + } + + // Retrieve ports from the arguments + debug_assert_eq!(exec_state.mode, CompMode::NewComponentBlocked); + + let (procedure_id, procedure_type_id) = exec_state.mode_component; + let mut arguments = exec_state.mode_value.take(); + let mut ports = Vec::new(); + find_ports_in_value_group(&arguments, &mut ports); + debug_assert!(ports_not_blocked(instantiator_ctx, &ports)); + + // Reserve a location for the new component + let reservation = sched_ctx.runtime.start_create_component(); + let mut created_ctx = CompCtx::new(&reservation); + + let mut port_pairs = Vec::with_capacity(ports.len()); + + // Go over all the ports that will be transferred. Since the ports will get + // a new ID in the new component, we will take care of that here. + for (port_location, instantiator_port_id) in &ports { + // Retrieve port information from instantiator + let instantiator_port_id = *instantiator_port_id; + let instantiator_port_handle = instantiator_ctx.get_port_handle(instantiator_port_id); + let instantiator_port = instantiator_ctx.get_port(instantiator_port_handle); + + // Create port at created component + let created_port_handle = created_ctx.add_port( + instantiator_port.peer_comp_id, instantiator_port.peer_port_id, + instantiator_port.kind, instantiator_port.state + ); + let created_port = created_ctx.get_port(created_port_handle); + let created_port_id = created_port.self_id; + + // Modify port ID in the arguments to the new component and store them + // for later access + let is_open = instantiator_port.state.is_open(); + port_pairs.push(PortPair{ + instantiator_id: instantiator_port_id, + instantiator_handle: instantiator_port_handle, + created_id: created_port_id, + created_handle: created_port_handle, + is_open, + }); + + for location in port_location.iter().copied() { + let value = arguments.get_value_mut(location); + match value { + Value::Input(id) => *id = port_id_to_eval(created_port_id), + Value::Output(id) => *id = port_id_to_eval(created_port_id), + _ => unreachable!(), + } + } + } + + // For each of the ports in the newly created component we set the peer to + // the correct value. We will not yet change the peer on the instantiator's + // ports (as we haven't yet stored the new component in the runtime's + // component storage) + let mut created_component_has_remote_peers = false; + for pair in port_pairs.iter() { + let instantiator_port_info = instantiator_ctx.get_port(pair.instantiator_handle); + let created_port_info = created_ctx.get_port_mut(pair.created_handle); + + if created_port_info.peer_comp_id == instantiator_ctx.id { + // The peer of the created component's port seems to be the + // instantiator. + let created_port_peer_index = port_pairs.iter() + .position(|v| v.instantiator_id == instantiator_port_info.peer_port_id); + + match created_port_peer_index { + Some(created_port_peer_index) => { + // However, the peer port is also moved to the new + // component, so the complete channel is owned by the new + // component. + let peer_pair = &port_pairs[created_port_peer_index]; + created_port_info.peer_port_id = peer_pair.created_id; + created_port_info.peer_comp_id = reservation.id(); + }, + None => { + // Peer port remains with instantiator. However, we cannot + // set the peer on the instantiator yet, because the new + // component has not yet been stored in the runtime's + // component storage. So we do this later + created_port_info.peer_comp_id = instantiator_ctx.id; + if pair.is_open { + created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(instantiator_ctx.id)); + } + } + } + } else { + // Peer is a different component + if pair.is_open { + // And the port is still open, so we need to notify the peer + let peer_handle = instantiator_ctx.get_peer_handle(created_port_info.peer_comp_id); + let peer_info = instantiator_ctx.get_peer(peer_handle); + created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(peer_info.id)); + created_component_has_remote_peers = true; + } + } + } + + // Now we store the new component into the runtime's component storage using + // the reservation. + let component = create_component( + &sched_ctx.runtime.protocol, procedure_id, procedure_type_id, + arguments, port_pairs.len() + ); + let (created_key, created_runtime_component) = sched_ctx.runtime.finish_create_component( + reservation, component, created_ctx, false + ); + let created_ctx = &mut created_runtime_component.ctx; + let created_component = &mut created_runtime_component.component; + created_component.on_creation(created_key.downgrade(), sched_ctx); + + // We now pass along the messages that the instantiator component still has + // that belong to the new component. At the same time we'll take care of + // setting the correct peer of the instantiator component + for pair in port_pairs.iter() { + // Transferring the messages and removing the port from the + // instantiator component + let instantiator_port_index = instantiator_ctx.get_port_index(pair.instantiator_handle); + instantiator_ctx.change_port_peer(sched_ctx, pair.instantiator_handle, None); + instantiator_ctx.remove_port(pair.instantiator_handle); + + if let Some(mut message) = inbox_main[instantiator_port_index].take() { + message.data_header.target_port = pair.created_id; + created_component.adopt_message(created_ctx, message); + } + + let mut message_index = 0; + while message_index < inbox_backup.len() { + let message = &inbox_backup[message_index]; + if message.data_header.target_port == pair.instantiator_id { + // Transfer the message + let mut message = inbox_backup.remove(message_index); + message.data_header.target_port = pair.created_id; + created_component.adopt_message(created_ctx, message); + } else { + // Message does not belong to the port pair that we're + // transferring to the new component. + message_index += 1; + } + } + + // Here we take care of the case where the instantiator previously owned + // both ends of the channel, but has transferred one port to the new + // component (hence creating a channel between the instantiator + // component and the new component). + let created_port_info = created_ctx.get_port(pair.created_handle); + if pair.is_open && created_port_info.peer_comp_id == instantiator_ctx.id { + // Note: the port we're receiving here belongs to the instantiator + // and is NOT in the "port_pairs" array. + let instantiator_port_handle = instantiator_ctx.get_port_handle(created_port_info.peer_port_id); + let instantiator_port_info = instantiator_ctx.get_port_mut(instantiator_port_handle); + instantiator_port_info.peer_port_id = created_port_info.self_id; + instantiator_ctx.change_port_peer(sched_ctx, instantiator_port_handle, Some(created_ctx.id)); + } + } + + // Finally: if we did move ports around whose peers are different + // components, then we'll initiate the appropriate protocol to notify them. + if created_component_has_remote_peers { + let schedule_entry_id = control.add_schedule_entry(created_ctx.id); + for pair in &port_pairs { + let port_info = created_ctx.get_port(pair.created_handle); + if pair.is_open && port_info.peer_comp_id != instantiator_ctx.id && port_info.peer_comp_id != created_ctx.id { + // Peer component is not the instantiator, and it is not the + // new component itself + let message = control.add_reroute_entry( + instantiator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id, + pair.instantiator_id, pair.created_id, created_ctx.id, + schedule_entry_id + ); + let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = created_ctx.get_peer(peer_handle); + + peer_info.handle.send_message_logged(sched_ctx, message, true); + } + } + } else { + // We can schedule the component immediately, we do not have to wait + // for any peers: there are none. + sched_ctx.runtime.enqueue_work(created_key); + } + + exec_state.mode = CompMode::NonSync; + exec_state.mode_component = (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()); +} + +pub(crate) fn ports_not_blocked(comp_ctx: &CompCtx, ports: &EncounteredPorts) -> bool { + for (_port_locations, port_id) in ports { + let port_handle = comp_ctx.get_port_handle(*port_id); + let port_info = comp_ctx.get_port(port_handle); + + if port_info.state.is_blocked_due_to_port_change() { + return false; + } + } + + return true; +} + /// Performs the default action of printing the provided error, and then putting /// the component in the state where it will shut down. Only to be used for /// builtin components: their error message construction is simpler (and more @@ -736,13 +1090,185 @@ pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling // Internal messaging/state utilities // ----------------------------------------------------------------------------- +/// Sends a message without any transmitted ports. Does not check if sending +/// is actually valid. +fn send_message_without_ports( + sending_port_handle: LocalPortHandle, value: ValueGroup, + comp_ctx: &CompCtx, sched_ctx: &SchedulerCtx, consensus: &mut Consensus, +) { + let port_info = comp_ctx.get_port(sending_port_handle); + debug_assert!(port_info.state.can_send()); + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + + let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); + peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); +} + +/// Prepares sending a message that contains ports. Only once a particular +/// protocol has completed (where we notify all the peers that the ports will +/// be transferred) will we actually send the message to the recipient. +fn start_send_message_with_ports( + sending_port_id: PortId, sending_port_instruction: PortInstruction, value: ValueGroup, + exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, + control: &mut ControlLayer +) -> Result<(), (PortInstruction, String)> { + debug_assert_eq!(exec_state.mode, CompMode::Sync); // busy in sync, trying to send + + // Retrieve ports we're going to transfer + let sending_port_handle = comp_ctx.get_port_handle(sending_port_id); + let sending_port_info = comp_ctx.get_port_mut(sending_port_handle); + sending_port_info.last_instruction = sending_port_instruction; + + let mut transmit_ports = Vec::new(); + find_ports_in_value_group(&value, &mut transmit_ports); + debug_assert!(!transmit_ports.is_empty()); // required from caller + + // Enter the state where we'll wait until all transferred ports are not + // blocked. + exec_state.set_as_blocked_put_with_ports(sending_port_id, value); + + if ports_not_blocked(comp_ctx, &transmit_ports) { + // Ports are not blocked, so we can send them right away. + perform_send_message_with_ports_notify_peers( + exec_state, comp_ctx, sched_ctx, control, transmit_ports + )?; + } // else: wait until they become unblocked + + return Ok(()) +} + +fn perform_send_message_with_ports_notify_peers( + exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, + control: &mut ControlLayer, transmit_ports: EncounteredPorts +) -> Result<(), (PortInstruction, String)> { + // Check we're in the correct state in debug mode + debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedTransferredPorts); + debug_assert!(ports_not_blocked(comp_ctx, &transmit_ports)); + + // Set up the final Ack that triggers us to send our final message + let unblock_put_control_id = control.add_unblock_put_with_ports_entry(); + for (_, port_id) in &transmit_ports { + let transmit_port_handle = comp_ctx.get_port_handle(*port_id); + let transmit_port_info = comp_ctx.get_port_mut(transmit_port_handle); + let peer_comp_id = transmit_port_info.peer_comp_id; + let peer_port_id = transmit_port_info.peer_port_id; + + + // Note: we checked earlier that we are currently in sync mode. Now we + // will check if we've already used the port we're about to transmit. + if !transmit_port_info.last_instruction.is_none() { + let sending_port_handle = comp_ctx.get_port_handle(exec_state.mode_port); + let sending_port_instruction = comp_ctx.get_port(sending_port_handle).last_instruction; + return Err(( + sending_port_instruction, + String::from("Cannot transmit one of the ports in this message, as it is used in this sync round") + )); + } + + if transmit_port_info.state.is_set(PortStateFlag::Transmitted) { + let sending_port_handle = comp_ctx.get_port_handle(exec_state.mode_port); + let sending_port_instruction = comp_ctx.get_port(sending_port_handle).last_instruction; + return Err(( + sending_port_instruction, + String::from("Cannot transmit one of the ports in this message, as that port is already transmitted") + )); + } + + // Set the flag for transmission + transmit_port_info.state.set(PortStateFlag::Transmitted); + + // Block the peer of the port + let message = control.create_port_transfer_message(unblock_put_control_id, comp_ctx.id, peer_port_id); + println!("DEBUG: Port transfer message\nControl ID: {:?}\nMessage: {:?}", unblock_put_control_id, message); + let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + + peer_info.handle.send_message_logged(sched_ctx, message, true); + } + + // We've set up the protocol, once all the PPC's are blocked we are supposed + // to transfer the message to the recipient. So store it temporarily + exec_state.mode = CompMode::PutPortsBlockedAwaitingAcks; + + return Ok(()); +} + +/// Performs the transmission of a data message that contains ports. These were +/// all stored in the component's execution state by the +/// `prepare_send_message_with_ports` function. Port must be ready to send! +fn perform_send_message_with_ports_to_receiver( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup +) -> Result<(), (PortInstruction, String)> { + debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedSendingPort); + + // Find all ports again + let mut transmit_ports = Vec::new(); + find_ports_in_value_group(&exec_state.mode_value, &mut transmit_ports); + + // Retrieve the port over which we're going to send the message + let port_handle = comp_ctx.get_port_handle(exec_state.mode_port); + let port_info = comp_ctx.get_port(port_handle); + + if !port_info.state.is_open() { + return Err(( + port_info.last_instruction, + String::from("cannot send over this port, as it is closed") + )); + } + + debug_assert!(!port_info.state.is_blocked_due_to_port_change()); // caller should have checked this + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + + // Change state back to its default + exec_state.mode = CompMode::Sync; + let message_value = exec_state.mode_value.take(); + exec_state.mode_port = PortId::new_invalid(); + + // Annotate the data message + let mut annotated_message = consensus.annotate_data_message(comp_ctx, port_info, message_value); + + // And further enhance the message by adding data about the ports that are + // being transferred + for (port_locations, transmit_port_id) in transmit_ports { + let transmit_port_handle = comp_ctx.get_port_handle(transmit_port_id); + let transmit_port_info = comp_ctx.get_port(transmit_port_handle); + + let transmit_messages = take_port_messages(comp_ctx, transmit_port_id, inbox_main, inbox_backup); + + let mut transmit_port_state = transmit_port_info.state; + debug_assert!(transmit_port_state.is_set(PortStateFlag::Transmitted)); + transmit_port_state.clear(PortStateFlag::Transmitted); + + annotated_message.ports.push(TransmittedPort{ + locations: port_locations, + messages: transmit_messages, + peer_comp: transmit_port_info.peer_comp_id, + peer_port: transmit_port_info.peer_port_id, + kind: transmit_port_info.kind, + state: transmit_port_state + }); + + comp_ctx.change_port_peer(sched_ctx, transmit_port_handle, None); + } + + // And finally, send the message to the peer + let peer_info = comp_ctx.get_peer(peer_handle); + peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); + + return Ok(()); +} + /// Handles an `Ack` for the control layer. fn default_handle_ack( - control: &mut ControlLayer, control_id: ControlId, - sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx -) { + exec_state: &mut CompExecState, control: &mut ControlLayer, control_id: ControlId, + sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup +) -> Result<(), (PortInstruction, String)>{ // Since an `Ack` may cause another one, handle them in a loop let mut to_ack = control_id; + loop { let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx); match action { @@ -765,6 +1291,22 @@ fn default_handle_ack( let _should_remove = handle.decrement_users(); debug_assert!(_should_remove.is_none()); }, + AckAction::UnblockPutWithPorts => { + // Send the message (containing ports) stored in the component + // execution state to the recipient + println!("DEBUG: Unblocking put with ports"); + debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedAwaitingAcks); + exec_state.mode = CompMode::PutPortsBlockedSendingPort; + let port_handle = comp_ctx.get_port_handle(exec_state.mode_port); + + // Little bit of a hack, we didn't really unblock the sending + // port, but this will mesh nicely with waiting for the sending + // port to become unblocked. + default_handle_recently_unblocked_port( + exec_state, control, consensus, port_handle, sched_ctx, + comp_ctx, inbox_main, inbox_backup + )?; + }, AckAction::None => {} } @@ -773,6 +1315,8 @@ fn default_handle_ack( None => break, } } + + return Ok(()); } /// Little helper for sending the most common kind of `Ack` @@ -790,16 +1334,26 @@ fn default_send_ack( } /// Handles the unblocking of a putter port. In case there is a pending message -/// on that port then it will be sent. +/// on that port then it will be sent. There are two reasons for calling this +/// function: either a port was blocked (i.e. the Blocked state flag was +/// cleared), or the component is ready to send a message containing ports +/// (stored in the execution state). In this latter case we might still have +/// a blocked port. fn default_handle_recently_unblocked_port( - exec_state: &mut CompExecState, consensus: &mut Consensus, + exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, -) { + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup +) -> Result<(), (PortInstruction, String)> { let port_info = comp_ctx.get_port_mut(port_handle); let port_id = port_info.self_id; - debug_assert!(!port_info.state.is_blocked()); // should have been done by the caller - if exec_state.is_blocked_on_put(port_id) { + if port_info.state.is_blocked() { + // Port is still blocked. We wait until the next control message where + // we unblock the port. + return Ok(()); + } + + if exec_state.is_blocked_on_put_without_ports(port_id) { // Annotate the message that we're going to send let port_info = comp_ctx.get_port(port_handle); // for immutable access debug_assert_eq!(port_info.kind, PortKind::Putter); @@ -811,9 +1365,36 @@ fn default_handle_recently_unblocked_port( let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message_logged(sched_ctx, Message::Data(to_send), true); - exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state. + // Return to the regular execution mode + exec_state.mode = CompMode::Sync; exec_state.mode_port = PortId::new_invalid(); + } else if exec_state.mode == CompMode::PutPortsBlockedTransferredPorts { + // We are waiting until all of the transferred ports become unblocked, + // check so here. + let mut transfer_ports = Vec::new(); + find_ports_in_value_group(&exec_state.mode_value, &mut transfer_ports); + if ports_not_blocked(comp_ctx, &transfer_ports) { + perform_send_message_with_ports_notify_peers( + exec_state, comp_ctx, sched_ctx, control, transfer_ports + )?; + } + } else if exec_state.mode == CompMode::PutPortsBlockedSendingPort && exec_state.mode_port == port_id { + // We checked above that the port became unblocked, so we can send the + // message + perform_send_message_with_ports_to_receiver( + exec_state, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup + )?; + } else if exec_state.is_blocked_on_create_component() { + let mut ports = Vec::new(); + find_ports_in_value_group(&exec_state.mode_value, &mut ports); + if ports_not_blocked(comp_ctx, &ports) { + perform_create_component( + exec_state, sched_ctx, comp_ctx, control, inbox_main, inbox_backup + ); + } } + + return Ok(()); } #[inline] @@ -825,3 +1406,77 @@ pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId { pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId { return EvalPortId{ id: port_id.0 }; } + +// TODO: Optimize double vec +type EncounteredPorts = Vec<(Vec, PortId)>; + +/// Recursively goes through the value group, attempting to find ports. +/// Duplicates will only be added once. +pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut EncounteredPorts) { + // Helper to check a value for a port and recurse if needed. + fn find_port_in_value(group: &ValueGroup, value: &Value, value_location: ValueId, ports: &mut EncounteredPorts) { + match value { + Value::Input(port_id) | Value::Output(port_id) => { + // This is an actual port + let cur_port = PortId(port_id.id); + for prev_port in ports.iter_mut() { + if prev_port.1 == cur_port { + // Already added + prev_port.0.push(value_location); + return; + } + } + + ports.push((vec![value_location], cur_port)); + }, + Value::Array(heap_pos) | + Value::Message(heap_pos) | + Value::String(heap_pos) | + Value::Struct(heap_pos) | + Value::Union(_, heap_pos) => { + // Reference to some dynamic thing which might contain ports, + // so recurse + let heap_region = &group.regions[*heap_pos as usize]; + for (value_index, embedded_value) in heap_region.iter().enumerate() { + let value_location = ValueId::Heap(*heap_pos, value_index as u32); + find_port_in_value(group, embedded_value, value_location, ports); + } + }, + _ => {}, // values we don't care about + } + } + + // Clear the ports, then scan all the available values + ports.clear(); + for (value_index, value) in value_group.values.iter().enumerate() { + find_port_in_value(value_group, value, ValueId::Stack(value_index as u32), ports); + } +} + +/// Goes through the inbox of a component and takes out all the messages that +/// are targeted at a specific port +pub(crate) fn take_port_messages( + comp_ctx: &CompCtx, port_id: PortId, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup +) -> Vec { + let mut messages = Vec::new(); + let port_handle = comp_ctx.get_port_handle(port_id); + let port_index = comp_ctx.get_port_index(port_handle); + + if let Some(message) = inbox_main[port_index].take() { + messages.push(message); + } + + let mut message_index = 0; + while message_index < inbox_backup.len() { + let message = &inbox_backup[message_index]; + if message.data_header.target_port == port_id { + let message = inbox_backup.remove(message_index); + messages.push(message); + } else { + message_index += 1; + } + } + + return messages; +} \ No newline at end of file diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs index b2c3435d5606f217b687597005aec6e3b4e3a262..1d00c2c83fa9b0ae269fe690f659ef94cf4c9e1d 100644 --- a/src/runtime2/component/component_context.rs +++ b/src/runtime2/component/component_context.rs @@ -14,6 +14,15 @@ pub enum PortInstruction { SourceLocation(ExpressionId), } +impl PortInstruction { + pub fn is_none(&self) -> bool { + match self { + PortInstruction::None => return true, + _ => return false, + } + } +} + /// Directionality of a port #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum PortKind { @@ -29,6 +38,8 @@ pub enum PortStateFlag { Closed = 0x01, // If not closed, then the port is open BlockedDueToPeerChange = 0x02, // busy changing peers, hence use of port is temporarily blocked BlockedDueToFullBuffers = 0x04, + Transmitted = 0x08, // Transmitted, so cannot be used anymore + Received = 0x10, // Received, so cannot be used yet, only after the sync round } #[derive(Copy, Clone)] @@ -48,6 +59,14 @@ impl PortState { return !self.is_closed(); } + #[inline] + pub fn can_send(&self) -> bool { + return + !self.is_set(PortStateFlag::Closed) && + !self.is_set(PortStateFlag::Transmitted) && + !self.is_set(PortStateFlag::Received); + } + #[inline] pub fn is_closed(&self) -> bool { return self.is_set(PortStateFlag::Closed); @@ -60,6 +79,11 @@ impl PortState { self.is_set(PortStateFlag::BlockedDueToFullBuffers); } + #[inline] + pub fn is_blocked_due_to_port_change(&self) -> bool { + return self.is_set(PortStateFlag::BlockedDueToPeerChange); + } + // lower-level utils #[inline] pub fn set(&mut self, flag: PortStateFlag) { @@ -85,7 +109,8 @@ impl Debug for PortState { for (flag_name, flag_value) in &[ ("closed", Closed), ("blocked_peer_change", BlockedDueToPeerChange), - ("blocked_full_buffers", BlockedDueToFullBuffers) + ("blocked_full_buffers", BlockedDueToFullBuffers), + ("transmitted", Transmitted), ] { s.field(flag_name, &self.is_set(*flag_value)); } @@ -176,7 +201,7 @@ impl CompCtx { return Channel{ putter_id, getter_id }; } - /// Adds a new port. Make sure to call `add_peer` afterwards. + /// Adds a new port. Make sure to call `change_peer` afterwards. pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle { let self_id = PortId(self.take_port_id()); self.ports.push(Port{ @@ -189,7 +214,28 @@ impl CompCtx { return LocalPortHandle(self_id); } - /// Removes a port. Make sure you called `remove_peer` first. + /// Adds a self-reference. Called by the runtime/scheduler + pub(crate) fn add_self_reference(&mut self, self_handle: CompHandle) { + debug_assert_eq!(self.id, self_handle.id()); + debug_assert!(self.get_peer_index_by_id(self.id).is_none()); + self.peers.push(Peer{ + id: self.id, + num_associated_ports: 0, + handle: self_handle + }); + } + + /// Removes a self-reference. Called by the runtime/scheduler + pub(crate) fn remove_self_reference(&mut self) -> Option { + let self_index = self.get_peer_index_by_id(self.id).unwrap(); + let peer = &mut self.peers[self_index]; + let maybe_comp_key = peer.handle.decrement_users(); + self.peers.remove(self_index); + + return maybe_comp_key; + } + + /// Removes a port. Make sure you called `change_peer` first. pub(crate) fn remove_port(&mut self, port_handle: LocalPortHandle) -> Port { let port_index = self.must_get_port_index(port_handle); let port = self.ports.remove(port_index); @@ -307,11 +353,6 @@ impl CompCtx { // Local utilities // ------------------------------------------------------------------------- - #[inline] - fn requires_peer_reference(port: &Port, self_id: CompId, required_if_closed: bool) -> bool { - return (!port.state.is_closed() || required_if_closed) && port.peer_comp_id != self_id; - } - fn must_get_port_index(&self, handle: LocalPortHandle) -> usize { for (index, port) in self.ports.iter().enumerate() { if port.self_id == handle.0 { diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index c6fa23e95c3e2a1622231c5f38d284391ae3dfc7..4d508f2545e458c5747fd9fdd2eed7fe9d269b92 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -110,7 +110,7 @@ impl Component for ComponentTcpClient { Message::Control(message) => { if let Err(location_and_message) = component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, - message, sched_ctx, comp_ctx + message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup ) { component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); } @@ -125,7 +125,11 @@ impl Component for ComponentTcpClient { sched_ctx.info(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state)); match self.exec_state.mode { - CompMode::BlockedSelect => { + CompMode::BlockedSelect | + CompMode::PutPortsBlockedTransferredPorts | + CompMode::PutPortsBlockedAwaitingAcks | + CompMode::PutPortsBlockedSendingPort | + CompMode::NewComponentBlocked => { // Not possible: we never enter this state unreachable!(); }, @@ -239,7 +243,11 @@ impl Component for ComponentTcpClient { Ok(num_received) => { self.byte_buffer.resize(num_received, 0); let message_content = self.bytes_to_data_message_content(&self.byte_buffer); - let send_result = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, message_content, sched_ctx, &mut self.consensus, comp_ctx); + let send_result = component::default_send_data_message( + &mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, + message_content, sched_ctx, &mut self.consensus, &mut self.control, comp_ctx + ); + if let Err(location_and_message) = send_result { component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); return CompScheduling::Immediate; diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index ed94ded96a5f9d0277ae3ffe542378e49cf3b5f3..36f5c267d82c7242ee2da12950b07ab58a05f5b1 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -218,7 +218,7 @@ pub(crate) struct CompPDL { // Should be same length as the number of ports. Corresponding indices imply // message is intended for that port. pub inbox_main: InboxMain, - pub inbox_backup: Vec, + pub inbox_backup: InboxBackup, } impl Component for CompPDL { @@ -241,15 +241,17 @@ impl Component for CompPDL { } fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) { - // sched_ctx.log(&format!("handling message: {:?}", message)); + sched_ctx.debug(&format!("handling message: {:?}", message)); if let Some(new_target) = self.control.should_reroute(&mut message) { let mut target = sched_ctx.runtime.get_component_public(new_target); // TODO: @NoDirectHandle + sched_ctx.debug(&format!("rerouting to: {:?}", new_target)); target.send_message_logged(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks let _should_remove = target.decrement_users(); debug_assert!(_should_remove.is_none()); return; } + sched_ctx.debug("handling message myself"); match message { Message::Data(message) => { self.handle_incoming_data_message(sched_ctx, comp_ctx, message); @@ -257,7 +259,7 @@ impl Component for CompPDL { Message::Control(message) => { if let Err(location_and_message) = component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, - message, sched_ctx, comp_ctx + message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup ) { self.handle_generic_component_error(sched_ctx, location_and_message); } @@ -282,7 +284,10 @@ impl Component for CompPDL { CompMode::NonSync | CompMode::Sync => { // continue and run PDL code }, - CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect => { + CompMode::SyncEnd | CompMode::BlockedGet | + CompMode::BlockedPut | CompMode::BlockedSelect | CompMode::PutPortsBlockedTransferredPorts | + CompMode::PutPortsBlockedAwaitingAcks | CompMode::PutPortsBlockedSendingPort | + CompMode::NewComponentBlocked => { return CompScheduling::Sleep; } CompMode::StartExit => return component::default_handle_start_exit( @@ -342,7 +347,7 @@ impl Component for CompPDL { let send_result = component::default_send_data_message( &mut self.exec_state, target_port_id, PortInstruction::SourceLocation(expr_id), value, - sched_ctx, &mut self.consensus, comp_ctx + sched_ctx, &mut self.consensus, &mut self.control, comp_ctx ); if let Err(location_and_message) = send_result { self.handle_generic_component_error(sched_ctx, location_and_message); @@ -420,14 +425,14 @@ impl Component for CompPDL { }, EC::NewComponent(definition_id, type_id, arguments) => { debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); - self.create_component_and_transfer_ports( - sched_ctx, comp_ctx, + component::default_start_create_component( + &mut self.exec_state, sched_ctx, comp_ctx, &mut self.control, + &mut self.inbox_main, &mut self.inbox_backup, definition_id, type_id, arguments ); return CompScheduling::Requeue; }, EC::NewChannel => { - debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); debug_assert!(self.exec_ctx.stmt.is_none()); let channel = comp_ctx.create_channel(); self.exec_ctx.stmt = ExecStmt::CreatedChannel(( @@ -552,343 +557,4 @@ impl CompPDL { self.exec_state.set_as_start_exit(exit_reason); } - - // ------------------------------------------------------------------------- - // Handling ports - // ------------------------------------------------------------------------- - - /// Creates a new component and transfers ports. Because of the stepwise - /// process in which memory is allocated, ports are transferred, messages - /// are exchanged, component lifecycle methods are called, etc. This - /// function facilitates a lot of implicit assumptions (e.g. when the - /// `Component::on_creation` method is called, the component is already - /// registered at the runtime). - fn create_component_and_transfer_ports( - &mut self, - sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, - definition_id: ProcedureDefinitionId, type_id: TypeId, mut arguments: ValueGroup - ) { - struct PortPair{ - creator_handle: LocalPortHandle, - creator_id: PortId, - created_handle: LocalPortHandle, - created_id: PortId, - } - let mut opened_port_id_pairs = Vec::new(); - let mut closed_port_id_pairs = Vec::new(); - - let reservation = sched_ctx.runtime.start_create_pdl_component(); - let mut created_ctx = CompCtx::new(&reservation); - - // let other_proc = &sched_ctx.runtime.protocol.heap[definition_id]; - // let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition]; - // dbg_code!({ - // sched_ctx.log(&format!( - // "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})", - // self_proc.identifier.value.as_str(), creator_ctx.id, - // other_proc.identifier.value.as_str(), reservation.id() - // )); - // }); - - // Take all the ports ID that are in the `args` (and currently belong to - // the creator component) and translate them into new IDs that are - // associated with the component we're about to create - let mut arg_iter = ValueGroupPortIter::new(&mut arguments); - while let Some(port_reference) = arg_iter.next() { - // Create port entry for new component - let creator_port_id = port_reference.id; - let creator_port_handle = creator_ctx.get_port_handle(creator_port_id); - let creator_port = creator_ctx.get_port(creator_port_handle); - let created_port_handle = created_ctx.add_port( - creator_port.peer_comp_id, creator_port.peer_port_id, - creator_port.kind, creator_port.state - ); - let created_port = created_ctx.get_port(created_port_handle); - let created_port_id = created_port.self_id; - - let port_id_pair = PortPair { - creator_handle: creator_port_handle, - creator_id: creator_port_id, - created_handle: created_port_handle, - created_id: created_port_id, - }; - - if creator_port.state.is_closed() { - closed_port_id_pairs.push(port_id_pair) - } else { - opened_port_id_pairs.push(port_id_pair); - } - - // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues) - let arg_value = if let Some(heap_pos) = port_reference.heap_pos { - &mut arg_iter.group.regions[heap_pos][port_reference.index] - } else { - &mut arg_iter.group.values[port_reference.index] - }; - match arg_value { - Value::Input(id) => *id = port_id_to_eval(created_port_id), - Value::Output(id) => *id = port_id_to_eval(created_port_id), - _ => unreachable!(), - } - } - - // For each transferred port pair set their peer components to the - // correct values. This will only change the values for the ports of - // the new component. - let mut created_component_has_remote_peers = false; - - for pair in opened_port_id_pairs.iter() { - let creator_port_info = creator_ctx.get_port(pair.creator_handle); - let created_port_info = created_ctx.get_port_mut(pair.created_handle); - - if created_port_info.peer_comp_id == creator_ctx.id { - // Peer of the transferred port is the component that is - // creating the new component. - let created_peer_port_index = opened_port_id_pairs - .iter() - .position(|v| v.creator_id == creator_port_info.peer_port_id); - match created_peer_port_index { - Some(created_peer_port_index) => { - // Addendum to the above comment: but that port is also - // moving to the new component - let peer_pair = &opened_port_id_pairs[created_peer_port_index]; - created_port_info.peer_port_id = peer_pair.created_id; - created_port_info.peer_comp_id = reservation.id(); - todo!("either add 'self peer', or remove that idea from Ctx altogether");` - }, - None => { - // Peer port remains with creator component. - created_port_info.peer_comp_id = creator_ctx.id; - created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(creator_ctx.id)); - } - } - } else { - // Peer is a different component. We'll deal with sending the - // appropriate messages later - let peer_handle = creator_ctx.get_peer_handle(created_port_info.peer_comp_id); - let peer_info = creator_ctx.get_peer(peer_handle); - created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(peer_info.id)); - created_component_has_remote_peers = true; - } - } - - // We'll now actually turn our reservation for a new component into an - // actual component. Note that we initialize it as "not sleeping" as - // its initial scheduling might be performed based on `Ack`s in response - // to message exchanges between remote peers. - let total_num_ports = opened_port_id_pairs.len() + closed_port_id_pairs.len(); - let component = component::create_component(&sched_ctx.runtime.protocol, definition_id, type_id, arguments, total_num_ports); - let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component( - reservation, component, created_ctx, false, - ); - component.component.on_creation(created_key.downgrade(), sched_ctx); - - // Now modify the creator's ports: remove every transferred port and - // potentially remove the peer component. - for pair in opened_port_id_pairs.iter() { - // Remove peer if appropriate - let creator_port_index = creator_ctx.get_port_index(pair.creator_handle); - creator_ctx.change_port_peer(sched_ctx, pair.creator_handle, None); - creator_ctx.remove_port(pair.creator_handle); - - // Transfer any messages - if let Some(mut message) = self.inbox_main.remove(creator_port_index) { - message.data_header.target_port = pair.created_id; - component.component.adopt_message(&mut component.ctx, message) - } - - let mut message_index = 0; - while message_index < self.inbox_backup.len() { - let message = &self.inbox_backup[message_index]; - if message.data_header.target_port == pair.creator_id { - // transfer message - let mut message = self.inbox_backup.remove(message_index); - message.data_header.target_port = pair.created_id; - component.component.adopt_message(&mut component.ctx, message); - } else { - message_index += 1; - } - } - - let created_port_info = component.ctx.get_port(pair.created_handle); - if created_port_info.peer_comp_id == creator_ctx.id { - // This handles the creation of a channel between the creator - // component and the newly created component. So if the creator - // had a `a -> b` channel, and `b` is moved to the new - // component, then `a` needs to set its peer component. - let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id); - let peer_port_info = creator_ctx.get_port_mut(peer_port_handle); - peer_port_info.peer_comp_id = component.ctx.id; - peer_port_info.peer_port_id = created_port_info.self_id; - creator_ctx.change_port_peer(sched_ctx, peer_port_handle, Some(component.ctx.id)); - } - } - - // Do the same for the closed ports. Note that we might still have to - // transfer messages that cause the new owner of the port to fail. - for pair in closed_port_id_pairs.iter() { - let port_index = creator_ctx.get_port_index(pair.creator_handle); - creator_ctx.remove_port(pair.creator_handle); - if let Some(mut message) = self.inbox_main.remove(port_index) { - message.data_header.target_port = pair.created_id; - component.component.adopt_message(&mut component.ctx, message); - } - - let mut message_index = 0; - while message_index < self.inbox_backup.len() { - let message = &self.inbox_backup[message_index]; - if message.data_header.target_port == pair.created_id { - // Transfer message - let mut message = self.inbox_backup.remove(message_index); - message.data_header.target_port = pair.created_id; - component.component.adopt_message(&mut component.ctx, message); - } else { - message_index += 1; - } - } - } - - // By now all ports and messages have been transferred. If there are any - // peers that need to be notified about this new component, then we - // initiate the protocol that will notify everyone here. - if created_component_has_remote_peers { - let created_ctx = &component.ctx; - let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id); - for pair in opened_port_id_pairs.iter() { - let port_info = created_ctx.get_port(pair.created_handle); - if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id { - let message = self.control.add_reroute_entry( - creator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id, - pair.creator_id, pair.created_id, created_ctx.id, - schedule_entry_id - ); - let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id); - let peer_info = created_ctx.get_peer(peer_handle); - peer_info.handle.send_message_logged(sched_ctx, message, true); - } - } - } else { - // Peer can be scheduled immediately - sched_ctx.runtime.enqueue_work(created_key); - } - } -} - -/// Recursively goes through the value group, attempting to find ports. -/// Duplicates will only be added once. -pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { - // Helper to check a value for a port and recurse if needed. - fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { - match value { - Value::Input(port_id) | Value::Output(port_id) => { - // This is an actual port - let cur_port = PortId(port_id.id); - for prev_port in ports.iter() { - if *prev_port == cur_port { - // Already added - return; - } - } - - ports.push(cur_port); - }, - Value::Array(heap_pos) | - Value::Message(heap_pos) | - Value::String(heap_pos) | - Value::Struct(heap_pos) | - Value::Union(_, heap_pos) => { - // Reference to some dynamic thing which might contain ports, - // so recurse - let heap_region = &group.regions[*heap_pos as usize]; - for embedded_value in heap_region { - find_port_in_value(group, embedded_value, ports); - } - }, - _ => {}, // values we don't care about - } - } - - // Clear the ports, then scan all the available values - ports.clear(); - for value in &value_group.values { - find_port_in_value(value_group, value, ports); - } -} - -struct ValueGroupPortIter<'a> { - group: &'a mut ValueGroup, - heap_stack: Vec<(usize, usize)>, - index: usize, -} - -impl<'a> ValueGroupPortIter<'a> { - fn new(group: &'a mut ValueGroup) -> Self { - return Self{ group, heap_stack: Vec::new(), index: 0 } - } -} - -struct ValueGroupPortRef { - id: PortId, - heap_pos: Option, // otherwise: on stack - index: usize, -} - -impl<'a> Iterator for ValueGroupPortIter<'a> { - type Item = ValueGroupPortRef; - - fn next(&mut self) -> Option { - // Enter loop that keeps iterating until a port is found - loop { - if let Some(pos) = self.heap_stack.last() { - let (heap_pos, region_index) = *pos; - if region_index >= self.group.regions[heap_pos].len() { - self.heap_stack.pop(); - continue; - } - - let value = &self.group.regions[heap_pos][region_index]; - self.heap_stack.last_mut().unwrap().1 += 1; - - match value { - Value::Input(id) | Value::Output(id) => { - let id = PortId(id.id); - return Some(ValueGroupPortRef{ - id, - heap_pos: Some(heap_pos), - index: region_index, - }); - }, - _ => {}, - } - - if let Some(heap_pos) = value.get_heap_pos() { - self.heap_stack.push((heap_pos as usize, 0)); - } - } else { - if self.index >= self.group.values.len() { - return None; - } - - let value = &mut self.group.values[self.index]; - self.index += 1; - - match value { - Value::Input(id) | Value::Output(id) => { - let id = PortId(id.id); - return Some(ValueGroupPortRef{ - id, - heap_pos: None, - index: self.index - 1 - }); - }, - _ => {}, - } - - // Not a port, check if we need to enter a heap region - if let Some(heap_pos) = value.get_heap_pos() { - self.heap_stack.push((heap_pos as usize, 0)); - } // else: just consider the next value - } - } - } } \ No newline at end of file diff --git a/src/runtime2/component/component_random.rs b/src/runtime2/component/component_random.rs index b48f13b5b8e17d7f777e420aae1eeb5c34f8a324..27ec3cd96ae08c6d4aa4bac672cd76742eb63f70 100644 --- a/src/runtime2/component/component_random.rs +++ b/src/runtime2/component/component_random.rs @@ -2,14 +2,10 @@ use rand::prelude as random; use rand::RngCore; use crate::protocol::eval::{ValueGroup, Value}; -use crate::runtime2::*; +use crate::runtime2::communication::*; use super::*; -use super::component::{ - self, - Component, CompExecState, CompScheduling, - CompMode, ExitReason -}; +use super::component::*; use super::control_layer::*; use super::consensus::*; @@ -28,6 +24,8 @@ pub struct ComponentRandomU32 { did_perform_send: bool, // when in sync mode control: ControlLayer, consensus: Consensus, + inbox_main: InboxMain, // not used + inbox_backup: InboxBackup, // not used } impl Component for ComponentRandomU32 { @@ -51,7 +49,7 @@ impl Component for ComponentRandomU32 { Message::Control(message) => { if let Err(location_and_message) = component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, - message, sched_ctx, comp_ctx + message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup ) { component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); } @@ -64,7 +62,12 @@ impl Component for ComponentRandomU32 { sched_ctx.info(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode)); match self.exec_state.mode { - CompMode::BlockedGet | CompMode::BlockedSelect => { + CompMode::BlockedGet | + CompMode::BlockedSelect | + CompMode::PutPortsBlockedTransferredPorts | + CompMode::PutPortsBlockedAwaitingAcks | + CompMode::PutPortsBlockedSendingPort | + CompMode::NewComponentBlocked => { // impossible for this component, no input ports and no select // blocks unreachable!(); @@ -104,7 +107,8 @@ impl Component for ComponentRandomU32 { let send_result = component::default_send_data_message( &mut self.exec_state, self.output_port_id, PortInstruction::NoSource, value_group, - sched_ctx, &mut self.consensus, comp_ctx + sched_ctx, &mut self.consensus, &mut self.control, + comp_ctx ); if let Err(location_and_message) = send_result { @@ -157,6 +161,8 @@ impl ComponentRandomU32 { did_perform_send: false, control: ControlLayer::default(), consensus: Consensus::new(), + inbox_main: Vec::new(), + inbox_backup: Vec::new(), } } } \ No newline at end of file diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index 764963a2e79f5b41c55d91bbf8ffe7123ad18ae4..dc11dd2c27b065ab112be629ac1ce3d31144a575 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -355,6 +355,20 @@ impl Consensus { self.solution.clear(); } + pub(crate) fn notify_received_port(&mut self, _expected_index: usize, port_handle: LocalPortHandle, comp_ctx: &CompCtx) { + debug_assert_eq!(_expected_index, self.ports.len()); + let port_info = comp_ctx.get_port(port_handle); + self.ports.push(PortAnnotation{ + self_comp_id: comp_ctx.id, + self_port_id: port_info.self_id, + peer_comp_id: port_info.peer_comp_id, + peer_port_id: port_info.peer_port_id, + peer_discovered: false, + mapping: None, + kind: port_info.kind, + }); + } + // ------------------------------------------------------------------------- // Handling inbound and outbound messages // ------------------------------------------------------------------------- @@ -366,7 +380,10 @@ impl Consensus { let data_header = self.create_data_header_and_update_mapping(port_info); let sync_header = self.create_sync_header(comp_ctx); - return DataMessage{ data_header, sync_header, content }; + return DataMessage{ + data_header, sync_header, content, + ports: Vec::new() + }; } /// Handles the arrival of a new data message (needs to be called for every diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index b7ad2c40b907deac5b614da1e0cba99181993434..9eabc0cd5f0534f877c7251c46b1296ac11bfbf4 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -5,12 +5,12 @@ use crate::runtime2::component::*; use super::component_context::*; #[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub(crate) struct ControlId(u32); +pub(crate) struct ControlId(pub(crate) u32); impl ControlId { /// Like other invalid IDs, this one doesn't care any significance, but is /// just set at u32::MAX to hopefully bring out bugs sooner. - fn new_invalid() -> Self { + pub(crate) fn new_invalid() -> Self { return ControlId(u32::MAX); } } @@ -25,6 +25,7 @@ enum ControlContent { PeerChange(ContentPeerChange), ScheduleComponent(CompId), ClosedPort(PortId), + UnblockPutWithPorts } struct ContentPeerChange { @@ -45,6 +46,7 @@ pub(crate) enum AckAction { None, SendMessage(CompId, ControlMessage), ScheduleComponent(CompId), + UnblockPutWithPorts, } /// Handling/sending control messages. @@ -128,6 +130,9 @@ impl ControlLayer { comp_ctx.change_port_peer(sched_ctx, port_handle, None); return (AckAction::None, None); + }, + ControlContent::UnblockPutWithPorts => { + return (AckAction::UnblockPutWithPorts, None); } } } @@ -137,7 +142,7 @@ impl ControlLayer { } // ------------------------------------------------------------------------- - // Port transfer (due to component creation) + // Port transfer // ------------------------------------------------------------------------- /// Adds an entry that, when completely ack'd, will schedule a component. @@ -152,15 +157,20 @@ impl ControlLayer { return entry_id; } - /// Removes a schedule entry. Only used if the caller preemptively called - /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`, - /// hence the `ack_countdown` in the scheduling entry is at 0. - pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) { - let index = self.get_entry_index_by_id(schedule_entry_id).unwrap(); - debug_assert_eq!(self.entries[index].ack_countdown, 0); - self.entries.remove(index); + /// Adds an entry that returns the similarly named Ack action + pub(crate) fn add_unblock_put_with_ports_entry(&mut self) -> ControlId { + let entry_id = self.take_id(); + self.entries.push(ControlEntry{ + id: entry_id, + ack_countdown: 0, // incremented by calls to `add_reroute_entry` + content: ControlContent::UnblockPutWithPorts, + }); + + return entry_id; } + /// Adds a rerouting entry (used to ensure all messages will end up at a + /// newly created component). Used when creating a new component. pub(crate) fn add_reroute_entry( &mut self, creator_comp_id: CompId, source_port_id: PortId, source_comp_id: CompId, @@ -193,6 +203,24 @@ impl ControlLayer { }) } + /// Creates a PortPeerChanged message (and increments ack-counter on a + /// pre-created control entry) that is used as a preliminary step before + /// transferring a port over a channel. + pub(crate) fn create_port_transfer_message( + &mut self, associated_control_id: ControlId, + sender_comp_id: CompId, target_port_id: PortId + ) -> Message { + let entry_index = self.get_entry_index_by_id(associated_control_id).unwrap(); + self.entries[entry_index].ack_countdown += 1; + + return Message::Control(ControlMessage{ + id: associated_control_id, + sender_comp_id, + target_port_id: Some(target_port_id), + content: ControlMessageContent::PortPeerChangedBlock + }) + } + // ------------------------------------------------------------------------- // Blocking, unblocking, and closing ports // ------------------------------------------------------------------------- diff --git a/src/runtime2/component/mod.rs b/src/runtime2/component/mod.rs index e64f6bd1b2674b57f3bb68a2a470c5f51c2f3ebc..7dc79f4287745a0f7fcb5bb7a2942228b1b33370 100644 --- a/src/runtime2/component/mod.rs +++ b/src/runtime2/component/mod.rs @@ -8,7 +8,7 @@ mod component_internet; pub(crate) use component::{Component, CompScheduling}; pub(crate) use component_pdl::{CompPDL}; -pub(crate) use component_context::{CompCtx, PortInstruction}; +pub(crate) use component_context::{CompCtx, PortInstruction, PortKind, PortState, PortStateFlag}; pub(crate) use control_layer::{ControlId}; use super::scheduler::*; diff --git a/src/runtime2/error.rs b/src/runtime2/error.rs index 4982a3230d43bbb8dcfe4f2fc31913b84b98a46b..a222e98581178228c4960516a04e5b21a5dc1e49 100644 --- a/src/runtime2/error.rs +++ b/src/runtime2/error.rs @@ -1,4 +1,4 @@ -use std::fmt::{Write, Debug, Display, Formatter as FmtFormatter, Result as FmtResult}; +use std::fmt::{Debug, Display, Formatter as FmtFormatter, Result as FmtResult}; /// Represents an unrecoverable runtime error that is reported to the user (for /// debugging purposes). Basically a human-readable message with its source diff --git a/src/runtime2/poll/mod.rs b/src/runtime2/poll/mod.rs index 2fc67d21192fb736ee7a12b58581c0f86bc42aaf..86225ca65fe06aead14e60095c4592e216bf3638 100644 --- a/src/runtime2/poll/mod.rs +++ b/src/runtime2/poll/mod.rs @@ -177,6 +177,7 @@ impl PollingThread { } pub(crate) fn run(&mut self) { + use std::io::ErrorKind; use crate::runtime2::communication::Message; const NUM_EVENTS: usize = 256; @@ -191,7 +192,23 @@ impl PollingThread { loop { // Retrieve events first (because the PollingClient will first // register at epoll, and then push a command into the queue). - self.poller.wait(&mut events, EPOLL_DURATION).unwrap(); + loop { + let wait_result = self.poller.wait(&mut events, EPOLL_DURATION); + match wait_result { + Ok(()) => break, + Err(reason) => { + match reason.kind() { + ErrorKind::Interrupted => { + // Happens when we're debugging and set a break- + // point, we want to continue waiting + }, + _ => { + panic!("failed to poll: {}", reason); + } + } + } + } + } // Then handle everything in the command queue. while let Some(command) = self.queue.pop() { diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index bf954b4cdea2ede7f5efb75fef397e823e46f962..7cbce4137cf2d6c188236378ba877df3e45e5461 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -74,7 +74,8 @@ pub(crate) struct RuntimeComp { pub exiting: bool, } -/// Should contain everything that is accessible in a thread-safe manner +/// Should contain everything that is accessible in a thread-safe manner. May +/// NOT contain non-threadsafe fields. // TODO: Do something about the `num_handles` thing. This needs to be a bit more // "foolproof" to lighten the mental burden of using the `num_handles` // variable. @@ -95,12 +96,19 @@ pub(crate) struct CompHandle { } impl CompHandle { - fn new(id: CompId, public: &CompPublic) -> CompHandle { - let handle = CompHandle{ + /// Creates a new component handle and does not increment the reference + /// counter. + fn new_unincremented(id: CompId, public: &CompPublic) -> CompHandle { + return CompHandle{ target: public, id, #[cfg(debug_assertions)] decremented: false, }; + } + + /// Creates a new component handle and increments the reference counter. + fn new(id: CompId, public: &CompPublic) -> CompHandle { + let mut handle = Self::new_unincremented(id, public); handle.increment_users(); return handle; } @@ -232,10 +240,10 @@ impl Runtime { module_name, routine_name, ValueGroup::new_stack(Vec::new()) )?; - let reserved = self.inner.start_create_pdl_component(); + let reserved = self.inner.start_create_component(); let ctx = CompCtx::new(&reserved); let component = Box::new(CompPDL::new(prompt, 0)); - let (key, _) = self.inner.finish_create_pdl_component(reserved, component, ctx, false); + let (key, _) = self.inner.finish_create_component(reserved, component, ctx, false); self.inner.enqueue_work(key); return Ok(()) @@ -284,22 +292,24 @@ impl RuntimeInner { // Creating/destroying components - pub(crate) fn start_create_pdl_component(&self) -> CompReserved { + pub(crate) fn start_create_component(&self) -> CompReserved { self.increment_active_components(); let reservation = self.components.reserve(); return CompReserved{ reservation }; } - pub(crate) fn finish_create_pdl_component( + pub(crate) fn finish_create_component( &self, reserved: CompReserved, component: Box, mut context: CompCtx, initially_sleeping: bool, ) -> (CompKey, &mut RuntimeComp) { + // Construct runtime component let inbox_queue = QueueDynMpsc::new(16); let inbox_producer = inbox_queue.producer(); - let _id = reserved.id(); - context.id = reserved.id(); - let component = RuntimeComp { + let component_id = reserved.id(); + context.id = component_id; + + let mut component = RuntimeComp { public: CompPublic{ sleeping: AtomicBool::new(initially_sleeping), num_handles: AtomicU32::new(1), // the component itself acts like a handle @@ -311,10 +321,17 @@ impl RuntimeInner { exiting: false, }; + // Submit created component into storage. let index = self.components.submit(reserved.reservation, component); - debug_assert_eq!(index, _id.0); + debug_assert_eq!(index, component_id.0); let component = self.components.get_mut(index); + // Bit messy, but here we create the reference of a component to itself, + // the `num_handles` being initialized to `1` above, and add it to the + // component context. + let self_handle = CompHandle::new_unincremented(component_id, &component.public); + component.ctx.add_self_reference(self_handle); + return (CompKey(index), component); } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 4aea4d03e2928f658de0827159de084f48b5e5dc..491e9f411457e73f065101ee360593593b2cdc71 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -89,7 +89,7 @@ impl Scheduler { CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); }, CompScheduling::Exit => { component.component.on_shutdown(&scheduler_ctx); - self.mark_component_as_exiting(&scheduler_ctx, component); + self.mark_component_as_exiting(&scheduler_ctx, comp_key, component); } } } @@ -120,16 +120,14 @@ impl Scheduler { /// Marks the component as exiting by removing the reference it holds to /// itself. Afterward the component will enter "normal" sleeping mode (if it /// has not yet been destroyed) - fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) { + fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, comp_key: CompKey, component: &mut RuntimeComp) { // If we didn't yet decrement our reference count, do so now - let comp_key = unsafe{ component.ctx.id.upgrade() }; - if !component.exiting { component.exiting = true; - let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel); - let new_count = old_count - 1; - if new_count == 0 { + let maybe_comp_key = component.ctx.remove_self_reference(); + if let Some(_comp_key) = maybe_comp_key { + debug_assert_eq!(_comp_key.0, comp_key.0); sched_ctx.runtime.destroy_component(comp_key); return; } diff --git a/src/runtime2/tests/messaging.rs b/src/runtime2/tests/messaging.rs new file mode 100644 index 0000000000000000000000000000000000000000..fc4a0dd65f241980bcc60e866ee705620648a501 --- /dev/null +++ b/src/runtime2/tests/messaging.rs @@ -0,0 +1,119 @@ +use super::*; + + +#[test] +fn test_component_communication() { + let pd = ProtocolDescription::parse(b" + primitive sender(out o, u32 outside_loops, u32 inside_loops) { + u32 outside_index = 0; + while (outside_index < outside_loops) { + u32 inside_index = 0; + sync while (inside_index < inside_loops) { + put(o, inside_index); + inside_index += 1; + } + outside_index += 1; + } + } + + primitive receiver(in i, u32 outside_loops, u32 inside_loops) { + u32 outside_index = 0; + while (outside_index < outside_loops) { + u32 inside_index = 0; + sync while (inside_index < inside_loops) { + auto val = get(i); + while (val != inside_index) {} // infinite loop if incorrect value is received + inside_index += 1; + } + outside_index += 1; + } + } + + composite constructor() { + channel o_orom -> i_orom; + channel o_mrom -> i_mrom; + channel o_ormm -> i_ormm; + channel o_mrmm -> i_mrmm; + + // one round, one message per round + new sender(o_orom, 1, 1); + new receiver(i_orom, 1, 1); + + // multiple rounds, one message per round + new sender(o_mrom, 5, 1); + new receiver(i_mrom, 5, 1); + + // one round, multiple messages per round + new sender(o_ormm, 1, 5); + new receiver(i_ormm, 1, 5); + + // multiple rounds, multiple messages per round + new sender(o_mrmm, 5, 5); + new receiver(i_mrmm, 5, 5); + }").expect("compilation"); + let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap(); + create_component(&rt, "", "constructor", no_args()); +} + +#[test] +fn test_send_to_self() { + compile_and_create_component(" + primitive insane_in_the_membrane() { + channel a -> b; + sync { + put(a, 1); + auto v = get(b); + while (v != 1) {} + } + } + ", "insane_in_the_membrane", no_args()); +} + +#[test] +fn test_intermediate_messenger() { + let pd = ProtocolDescription::parse(b" + primitive receiver(in rx, u32 num) { + auto index = 0; + while (index < num) { + sync { auto v = get(rx); } + index += 1; + } + } + + primitive middleman(in rx, out tx, u32 num) { + auto index = 0; + while (index < num) { + sync { put(tx, get(rx)); } + index += 1; + } + } + + primitive sender(out tx, u32 num) { + auto index = 0; + while (index < num) { + sync put(tx, 1337); + index += 1; + } + } + + composite constructor_template() { + auto num = 0; + channel tx_a -> rx_a; + channel tx_b -> rx_b; + new sender(tx_a, 3); + new middleman(rx_a, tx_b, 3); + new receiver(rx_b, 3); + } + + composite constructor() { + new constructor_template(); + new constructor_template(); + new constructor_template(); + new constructor_template(); + new constructor_template(); + new constructor_template(); + } + ").expect("compilation"); + let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap(); + create_component(&rt, "", "constructor", no_args()); +} diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 6baa362a0c2dd9fc572b89a4c7b005151ed2f6ac..85572c586c717aafef2405f6d35172c9dfe4b791 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -3,10 +3,12 @@ use crate::protocol::eval::*; use crate::runtime2::runtime::*; use crate::runtime2::component::{CompCtx, CompPDL}; +mod messaging; mod error_handling; +mod transfer_ports; const LOG_LEVEL: LogLevel = LogLevel::Debug; -const NUM_THREADS: u32 = 4; +const NUM_THREADS: u32 = 1; pub(crate) fn compile_and_create_component(source: &str, routine_name: &str, args: ValueGroup) { let protocol = ProtocolDescription::parse(source.as_bytes()) @@ -20,10 +22,10 @@ pub(crate) fn create_component(rt: &Runtime, module_name: &str, routine_name: &s let prompt = rt.inner.protocol.new_component( module_name.as_bytes(), routine_name.as_bytes(), args ).expect("create prompt"); - let reserved = rt.inner.start_create_pdl_component(); + let reserved = rt.inner.start_create_component(); let ctx = CompCtx::new(&reserved); let component = Box::new(CompPDL::new(prompt, 0)); - let (key, _) = rt.inner.finish_create_pdl_component(reserved, component, ctx, false); + let (key, _) = rt.inner.finish_create_component(reserved, component, ctx, false); rt.inner.enqueue_work(key); } @@ -44,109 +46,6 @@ fn test_component_creation() { } } -#[test] -fn test_component_communication() { - let pd = ProtocolDescription::parse(b" - primitive sender(out o, u32 outside_loops, u32 inside_loops) { - u32 outside_index = 0; - while (outside_index < outside_loops) { - u32 inside_index = 0; - sync while (inside_index < inside_loops) { - put(o, inside_index); - inside_index += 1; - } - outside_index += 1; - } - } - - primitive receiver(in i, u32 outside_loops, u32 inside_loops) { - u32 outside_index = 0; - while (outside_index < outside_loops) { - u32 inside_index = 0; - sync while (inside_index < inside_loops) { - auto val = get(i); - while (val != inside_index) {} // infinite loop if incorrect value is received - inside_index += 1; - } - outside_index += 1; - } - } - - composite constructor() { - channel o_orom -> i_orom; - channel o_mrom -> i_mrom; - channel o_ormm -> i_ormm; - channel o_mrmm -> i_mrmm; - - // one round, one message per round - new sender(o_orom, 1, 1); - new receiver(i_orom, 1, 1); - - // multiple rounds, one message per round - new sender(o_mrom, 5, 1); - new receiver(i_mrom, 5, 1); - - // one round, multiple messages per round - new sender(o_ormm, 1, 5); - new receiver(i_ormm, 1, 5); - - // multiple rounds, multiple messages per round - new sender(o_mrmm, 5, 5); - new receiver(i_mrmm, 5, 5); - }").expect("compilation"); - let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap(); - create_component(&rt, "", "constructor", no_args()); -} - -#[test] -fn test_intermediate_messenger() { - let pd = ProtocolDescription::parse(b" - primitive receiver(in rx, u32 num) { - auto index = 0; - while (index < num) { - sync { auto v = get(rx); } - index += 1; - } - } - - primitive middleman(in rx, out tx, u32 num) { - auto index = 0; - while (index < num) { - sync { put(tx, get(rx)); } - index += 1; - } - } - - primitive sender(out tx, u32 num) { - auto index = 0; - while (index < num) { - sync put(tx, 1337); - index += 1; - } - } - - composite constructor_template() { - auto num = 0; - channel tx_a -> rx_a; - channel tx_b -> rx_b; - new sender(tx_a, 3); - new middleman(rx_a, tx_b, 3); - new receiver(rx_b, 3); - } - - composite constructor() { - new constructor_template(); - new constructor_template(); - new constructor_template(); - new constructor_template(); - new constructor_template(); - new constructor_template(); - } - ").expect("compilation"); - let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap(); - create_component(&rt, "", "constructor", no_args()); -} - #[test] fn test_simple_select() { let pd = ProtocolDescription::parse(b" diff --git a/src/runtime2/tests/transfer_ports.rs b/src/runtime2/tests/transfer_ports.rs new file mode 100644 index 0000000000000000000000000000000000000000..e84a5b2bb5880230bc0048c5bee8ccd04270a2c5 --- /dev/null +++ b/src/runtime2/tests/transfer_ports.rs @@ -0,0 +1,181 @@ +use super::*; + +#[test] +fn test_transfer_precreated_port_with_owned_peer() { + compile_and_create_component(" + primitive port_sender(out> tx) { + channel a -> b; + sync put(tx, b); + } + + primitive port_receiver(in> rx) { + sync auto a = get(rx); + } + + composite constructor() { + channel a -> b; + new port_sender(a); + new port_receiver(b); + } + ", "constructor", no_args()); +} + +#[test] +fn test_transfer_precreated_port_with_foreign_peer() { + compile_and_create_component(" + primitive port_sender(out> tx, in to_send) { + sync put(tx, to_send); + } + + primitive port_receiver(in> rx) { + sync auto a = get(rx); + } + + composite constructor() { + channel tx -> rx; + channel forgotten -> to_send; + new port_sender(tx, to_send); + new port_receiver(rx); + } + ", "constructor", no_args()); +} + +#[test] +fn test_transfer_synccreated_port() { + compile_and_create_component(" + primitive port_sender(out> tx) { + sync { + channel a -> b; + put(tx, b); + } + } + + primitive port_receiver(in> rx) { + sync auto a = get(rx); + } + + composite constructor() { + channel a -> b; + new port_sender(a); + new port_receiver(b); + } + ", "constructor", no_args()); +} + +#[test] +fn test_transfer_precreated_port_with_owned_peer_and_communication() { + compile_and_create_component(" + primitive port_sender(out> tx) { + channel a -> b; + sync put(tx, b); + sync put(a, 1337); + } + + primitive port_receiver(in> rx) { + channel a -> b; // this is stupid, but we need to have a variable to use + sync b = get(rx); + u32 value = 0; + sync value = get(b); + while (value != 1337) {} + } + composite constructor() { + channel a -> b; + new port_sender(a); + new port_receiver(b); + } + ", "constructor", no_args()); +} + +#[test] +fn test_transfer_precreated_port_with_foreign_peer_and_communication() { + compile_and_create_component(" + primitive port_sender(out> tx, in to_send) { + sync put(tx, to_send); + } + + primitive message_transmitter(out tx) { + sync put(tx, 1337); + } + + primitive port_receiver(in> rx) { + channel unused -> b; + sync b = get(rx); + u32 value = 0; + sync value = get(b); + while (value != 1337) {} + } + + composite constructor() { + channel port_tx -> port_rx; + channel value_tx -> value_rx; + new port_sender(port_tx, value_rx); + new port_receiver(port_rx); + new message_transmitter(value_tx); + } + ", "constructor", no_args()); +} + +#[test] +fn test_transfer_precreated_port_with_owned_peer_back_and_forth() { + compile_and_create_component(" + primitive port_send_and_receive(out> tx, in> rx) { + channel a -> b; + sync { + put(tx, b); + b = get(rx); + } + } + + primitive port_receive_and_send(in> rx, out> tx) { + channel unused -> transferred; // same problem as in different tests + sync { + transferred = get(rx); + put(tx, transferred); + } + } + + composite constructor() { + channel port_tx_forward -> port_rx_forward; + channel port_tx_backward -> port_rx_backward; + + new port_send_and_receive(port_tx_forward, port_rx_backward); + new port_receive_and_send(port_rx_forward, port_tx_backward); + }", "constructor", no_args()); +} + +#[test] +fn test_transfer_precreated_port_with_foreign_peer_back_and_forth_and_communication() { + compile_and_create_component(" + primitive port_send_and_receive(out> tx, in> rx, in to_transfer) { + sync { + put(tx, to_transfer); + to_transfer = get(rx); + } + sync { + auto value = get(to_transfer); + while (value != 1337) {} + } + } + + primitive port_receive_and_send(in> rx, out> tx) { + channel unused -> transferred; + sync { + transferred = get(rx); + put(tx, transferred); + } + } + + primitive value_sender(out tx) { + sync put(tx, 1337); + } + + composite constructor() { + channel port_tx_forward -> port_rx_forward; + channel port_tx_backward -> port_rx_backward; + channel message_tx -> message_rx; + new port_send_and_receive(port_tx_forward, port_rx_backward, message_rx); + new port_receive_and_send(port_rx_forward, port_tx_backward); + new value_sender(message_tx); + } + ", "constructor", no_args()); +} \ No newline at end of file