diff --git a/src/protocol/parser/mod.rs b/src/protocol/parser/mod.rs index 8f1500639951df5c37527325b710f7e2f3e88097..b3c7dd4b763992e145130a42b861333a4122ae3a 100644 --- a/src/protocol/parser/mod.rs +++ b/src/protocol/parser/mod.rs @@ -327,7 +327,13 @@ impl Parser { // Make sure directory exists let path = Path::new(&base_path); if !path.exists() { - return Err(format!("std lib root directory '{}' does not exist", base_path)); + let from_env_message = if from_env { + format!(" (retrieved from the environment variable '{}')", REOWOLF_PATH_ENV) + } else { + String::new() + }; + + return Err(format!("std lib root directory '{}'{} does not exist", base_path, from_env_message)); } // Try to load all standard library files. We might need a more unified diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 7e450523700cc9aebe4153b98da061478f4cc1c5..94a11291b2605b9209da0974ff3a32044e0a32b0 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -4,7 +4,6 @@ use crate::protocol::eval::{Prompt, EvalError, ValueGroup, Value, ValueId, PortI use crate::protocol::*; use crate::runtime2::*; use crate::runtime2::communication::*; -use crate::runtime2::component::component_pdl::find_ports_in_value_group; use super::{CompCtx, CompPDL, CompId}; use super::component_context::*; @@ -234,7 +233,8 @@ pub(crate) fn create_component( pub(crate) fn default_send_data_message( exec_state: &mut CompExecState, transmitting_port_id: PortId, port_instruction: PortInstruction, value: ValueGroup, - sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx + sched_ctx: &SchedulerCtx, consensus: &mut Consensus, + control: &mut ControlLayer, comp_ctx: &mut CompCtx ) -> Result { debug_assert_eq!(exec_state.mode, CompMode::Sync); @@ -262,33 +262,21 @@ pub(crate) fn default_send_data_message( let mut ports = Vec::new(); find_ports_in_value_group(&value, &mut ports); if !ports.is_empty() { + prepare_send_message_with_ports( + transmitting_port_id, port_instruction, value, exec_state, + comp_ctx, sched_ctx, control + )?; + return Ok(CompScheduling::Sleep); + } else { + // Port is not blocked and no ports to transfer: send to the peer + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); + peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); - for (value_location, port_id) in ports { - let transmitted_port_handle = comp_ctx.get_port_handle(port_id); - let transmitted_port = comp_ctx.get_port(transmitted_port_handle); - - if transmitted_port.state.is_set(PortStateFlag::Transmitted) { - // Note: We could also attach where the port has been - // transferred - return Err(( - port_info.last_instruction, - String::from("Cannot send this message, as it contains a previously transmitted port") - )); - } - - // Prepare ack for PPC - // Prepare PPC message - } + return Ok(CompScheduling::Immediate); } - - // Port is not blocked, so send to the peer - let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); - let peer_info = comp_ctx.get_peer(peer_handle); - let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); - peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); - - return Ok(CompScheduling::Immediate); } } @@ -416,21 +404,20 @@ pub(crate) fn default_attempt_get( /// of full buffers (hence, will use the control layer to make sure the peer /// will become unblocked). pub(crate) fn default_handle_received_data_message( - targeted_port: PortId, port_instruction: PortInstruction, message: &mut DataMessage, + targeted_port: PortId, _port_instruction: PortInstruction, message: &mut DataMessage, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer ) -> Result<(), (PortInstruction, String)> { let port_handle = comp_ctx.get_port_handle(targeted_port); let port_index = comp_ctx.get_port_index(port_handle); - let slot = &mut inbox_main[port_index]; - debug_assert!(slot.is_none()); // because we've just received from it + debug_assert!(inbox_main[port_index].is_none()); // because we've just received from it // If we received any ports, add them to the port tracking and inbox struct. // Then notify the peers that they can continue sending to this port, but // now at a new address. for received_port in &mut message.ports { // Transfer messages to main/backup inbox - let new_inbox_index = inbox_main.len(); + let _new_inbox_index = inbox_main.len(); if !received_port.messages.is_empty() { inbox_main.push(Some(received_port.messages.remove(0))); } @@ -443,11 +430,12 @@ pub(crate) fn default_handle_received_data_message( received_port.peer_comp, received_port.peer_port, received_port.kind, new_port_state ); + debug_assert_eq!(_new_inbox_index, comp_ctx.get_port_index(new_port_handle)); + comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(received_port.peer_comp)); let new_port = comp_ctx.get_port(new_port_handle); - comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(new_port.peer_comp_id)); // Replace all references to the port in the received message - for message_location in received_port.locations { + for message_location in received_port.locations.iter().copied() { let value = match message_location { ValueId::Heap(heap_pos, heap_index) => &mut message.content.regions[heap_pos as usize][heap_index as usize], ValueId::Stack(stack_index) => &mut message.content.values[stack_index as usize], @@ -474,7 +462,7 @@ pub(crate) fn default_handle_received_data_message( id: ControlId::new_invalid(), sender_comp_id: comp_ctx.id, target_port_id: Some(new_port.peer_port_id), - content: ControlMessageContent::PortPeerChangedUnblock(new_port.id, comp_ctx.id) + content: ControlMessageContent::PortPeerChangedUnblock(new_port.self_id, comp_ctx.id) }), true); } @@ -490,7 +478,7 @@ pub(crate) fn default_handle_received_data_message( // One more message, place it in the slot let message = inbox_backup.remove(message_index); debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we're removing another message from the backup - *slot = Some(message); + inbox_main[port_index] = Some(message); return Ok(()); } @@ -522,7 +510,7 @@ pub(crate) fn default_handle_control_message( ) -> Result<(), (PortInstruction, String)> { match message.content { ControlMessageContent::Ack => { - default_handle_ack(exec_state, control, message.id, sched_ctx, comp_ctx, inbox_main, inbox_backup); + default_handle_ack(exec_state, control, message.id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup); }, ControlMessageContent::BlockPort => { // One of our messages was accepted, but the port should be @@ -555,7 +543,7 @@ pub(crate) fn default_handle_control_message( // The two components (sender and this component) are closing // the channel at the same time. So we don't care about the // content of the `ClosePort` message. - default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, inbox_main, inbox_backup); + default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup); } else { // Respond to the message let port_info = comp_ctx.get_port(port_handle); @@ -622,7 +610,6 @@ pub(crate) fn default_handle_control_message( let port_handle = comp_ctx.get_port_handle(port_to_change); let port_info = comp_ctx.get_port(port_handle); debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToPeerChange)); - let old_peer_id = port_info.peer_comp_id; let port_info = comp_ctx.get_port_mut(port_handle); port_info.peer_port_id = new_port_id; @@ -772,6 +759,7 @@ pub(crate) fn default_handle_sync_decision( if port_info.close_at_sync_end { port_info.state.set(PortStateFlag::Closed); } + port_info.state.clear(PortStateFlag::Received); } debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); exec_state.mode = CompMode::NonSync; @@ -917,8 +905,8 @@ fn perform_send_message_with_ports( // And further enhance the message by adding data about the ports that are // being transferred - for (port_locations, port_id) in transmit_ports { - let transmit_port_handle = comp_ctx.get_port_handle(port_id); + for (port_locations, transmit_port_id) in transmit_ports { + let transmit_port_handle = comp_ctx.get_port_handle(transmit_port_id); let transmit_port_info = comp_ctx.get_port(transmit_port_handle); let transmit_messages = take_port_messages(comp_ctx, transmit_port_id, inbox_main, inbox_backup); @@ -945,8 +933,8 @@ fn perform_send_message_with_ports( /// Handles an `Ack` for the control layer. fn default_handle_ack( exec_state: &mut CompExecState, control: &mut ControlLayer, control_id: ControlId, - sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, inbox_main: &mut InboxMain, - inbox_backup: &mut InboxBackup + sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) { // Since an `Ack` may cause another one, handle them in a loop let mut to_ack = control_id; @@ -1084,7 +1072,7 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut En // Clear the ports, then scan all the available values ports.clear(); - for (value_index, value) in &value_group.values.iter().enumerate() { + for (value_index, value) in value_group.values.iter().enumerate() { find_port_in_value(value_group, value, ValueId::Stack(value_index as u32), ports); } } diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs index 94dff5badfa7677ddbe3d4b2425ddff7366afc0a..4ee51581b3f46d0b8c43f278b58a8b3fc7e391cb 100644 --- a/src/runtime2/component/component_context.rs +++ b/src/runtime2/component/component_context.rs @@ -196,7 +196,7 @@ impl CompCtx { return Channel{ putter_id, getter_id }; } - /// Adds a new port. Make sure to call `add_peer` afterwards. + /// Adds a new port. Make sure to call `change_peer` afterwards. pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle { let self_id = PortId(self.take_port_id()); self.ports.push(Port{ @@ -209,7 +209,28 @@ impl CompCtx { return LocalPortHandle(self_id); } - /// Removes a port. Make sure you called `remove_peer` first. + /// Adds a self-reference. Called by the runtime/scheduler + pub(crate) fn add_self_reference(&mut self, self_handle: CompHandle) { + debug_assert_eq!(self.id, self_handle.id()); + debug_assert!(self.get_peer_index_by_id(self.id).is_none()); + self.peers.push(Peer{ + id: self.id, + num_associated_ports: 0, + handle: self_handle + }); + } + + /// Removes a self-reference. Called by the runtime/scheduler + pub(crate) fn remove_self_reference(&mut self) -> Option { + let self_index = self.get_peer_index_by_id(self.id).unwrap(); + let peer = &mut self.peers[self_index]; + let maybe_comp_key = peer.handle.decrement_users(); + self.peers.remove(self_index); + + return maybe_comp_key; + } + + /// Removes a port. Make sure you called `change_peer` first. pub(crate) fn remove_port(&mut self, port_handle: LocalPortHandle) -> Port { let port_index = self.must_get_port_index(port_handle); let port = self.ports.remove(port_index); diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index c6fa23e95c3e2a1622231c5f38d284391ae3dfc7..23c5bc24fb5e56fab02d3287b93f6d8a81d5b536 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -110,7 +110,7 @@ impl Component for ComponentTcpClient { Message::Control(message) => { if let Err(location_and_message) = component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, - message, sched_ctx, comp_ctx + message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup ) { component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); } @@ -125,7 +125,7 @@ impl Component for ComponentTcpClient { sched_ctx.info(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state)); match self.exec_state.mode { - CompMode::BlockedSelect => { + CompMode::BlockedSelect | CompMode::BlockedPutPorts => { // Not possible: we never enter this state unreachable!(); }, @@ -239,7 +239,11 @@ impl Component for ComponentTcpClient { Ok(num_received) => { self.byte_buffer.resize(num_received, 0); let message_content = self.bytes_to_data_message_content(&self.byte_buffer); - let send_result = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, message_content, sched_ctx, &mut self.consensus, comp_ctx); + let send_result = component::default_send_data_message( + &mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, + message_content, sched_ctx, &mut self.consensus, &mut self.control, comp_ctx + ); + if let Err(location_and_message) = send_result { component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); return CompScheduling::Immediate; diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 87aa4e0183237eb260948b5442ad319e291f09ed..b9ea01cc12e8b5278cdd85698c98e7ff25dae917 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -218,7 +218,7 @@ pub(crate) struct CompPDL { // Should be same length as the number of ports. Corresponding indices imply // message is intended for that port. pub inbox_main: InboxMain, - pub inbox_backup: Vec, + pub inbox_backup: InboxBackup, } impl Component for CompPDL { @@ -257,7 +257,7 @@ impl Component for CompPDL { Message::Control(message) => { if let Err(location_and_message) = component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, - message, sched_ctx, comp_ctx + message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup ) { self.handle_generic_component_error(sched_ctx, location_and_message); } @@ -282,7 +282,8 @@ impl Component for CompPDL { CompMode::NonSync | CompMode::Sync => { // continue and run PDL code }, - CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect => { + CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | + CompMode::BlockedSelect | CompMode::BlockedPutPorts => { return CompScheduling::Sleep; } CompMode::StartExit => return component::default_handle_start_exit( @@ -342,7 +343,7 @@ impl Component for CompPDL { let send_result = component::default_send_data_message( &mut self.exec_state, target_port_id, PortInstruction::SourceLocation(expr_id), value, - sched_ctx, &mut self.consensus, comp_ctx + sched_ctx, &mut self.consensus, &mut self.control, comp_ctx ); if let Err(location_and_message) = send_result { self.handle_generic_component_error(sched_ctx, location_and_message); @@ -577,18 +578,18 @@ impl CompPDL { let mut opened_port_id_pairs = Vec::new(); let mut closed_port_id_pairs = Vec::new(); - let reservation = sched_ctx.runtime.start_create_pdl_component(); + let reservation = sched_ctx.runtime.start_create_component(); let mut created_ctx = CompCtx::new(&reservation); - // let other_proc = &sched_ctx.runtime.protocol.heap[definition_id]; - // let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition]; - // dbg_code!({ - // sched_ctx.log(&format!( - // "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})", - // self_proc.identifier.value.as_str(), creator_ctx.id, - // other_proc.identifier.value.as_str(), reservation.id() - // )); - // }); + let other_proc = &sched_ctx.runtime.protocol.heap[definition_id]; + let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition]; + dbg_code!({ + sched_ctx.info(&format!( + "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})", + self_proc.identifier.value.as_str(), creator_ctx.id, + other_proc.identifier.value.as_str(), reservation.id() + )); + }); // Take all the ports ID that are in the `args` (and currently belong to // the creator component) and translate them into new IDs that are @@ -654,7 +655,7 @@ impl CompPDL { let peer_pair = &opened_port_id_pairs[created_peer_port_index]; created_port_info.peer_port_id = peer_pair.created_id; created_port_info.peer_comp_id = reservation.id(); - todo!("either add 'self peer', or remove that idea from Ctx altogether");` + todo!("either add 'self peer', or remove that idea from Ctx altogether"); }, None => { // Peer port remains with creator component. @@ -678,7 +679,7 @@ impl CompPDL { // to message exchanges between remote peers. let total_num_ports = opened_port_id_pairs.len() + closed_port_id_pairs.len(); let component = component::create_component(&sched_ctx.runtime.protocol, definition_id, type_id, arguments, total_num_ports); - let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component( + let (created_key, component) = sched_ctx.runtime.finish_create_component( reservation, component, created_ctx, false, ); component.component.on_creation(created_key.downgrade(), sched_ctx); diff --git a/src/runtime2/component/component_random.rs b/src/runtime2/component/component_random.rs index b48f13b5b8e17d7f777e420aae1eeb5c34f8a324..b2f1fc7e1e569543728a739cddca12a4c6a0e766 100644 --- a/src/runtime2/component/component_random.rs +++ b/src/runtime2/component/component_random.rs @@ -2,14 +2,10 @@ use rand::prelude as random; use rand::RngCore; use crate::protocol::eval::{ValueGroup, Value}; -use crate::runtime2::*; +use crate::runtime2::communication::*; use super::*; -use super::component::{ - self, - Component, CompExecState, CompScheduling, - CompMode, ExitReason -}; +use super::component::*; use super::control_layer::*; use super::consensus::*; @@ -28,6 +24,8 @@ pub struct ComponentRandomU32 { did_perform_send: bool, // when in sync mode control: ControlLayer, consensus: Consensus, + inbox_main: InboxMain, // not used + inbox_backup: InboxBackup, // not used } impl Component for ComponentRandomU32 { @@ -51,7 +49,7 @@ impl Component for ComponentRandomU32 { Message::Control(message) => { if let Err(location_and_message) = component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, - message, sched_ctx, comp_ctx + message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup ) { component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); } @@ -64,7 +62,7 @@ impl Component for ComponentRandomU32 { sched_ctx.info(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode)); match self.exec_state.mode { - CompMode::BlockedGet | CompMode::BlockedSelect => { + CompMode::BlockedGet | CompMode::BlockedSelect | CompMode::BlockedPutPorts => { // impossible for this component, no input ports and no select // blocks unreachable!(); @@ -104,7 +102,8 @@ impl Component for ComponentRandomU32 { let send_result = component::default_send_data_message( &mut self.exec_state, self.output_port_id, PortInstruction::NoSource, value_group, - sched_ctx, &mut self.consensus, comp_ctx + sched_ctx, &mut self.consensus, &mut self.control, + comp_ctx ); if let Err(location_and_message) = send_result { @@ -157,6 +156,8 @@ impl ComponentRandomU32 { did_perform_send: false, control: ControlLayer::default(), consensus: Consensus::new(), + inbox_main: Vec::new(), + inbox_backup: Vec::new(), } } } \ No newline at end of file diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index 764963a2e79f5b41c55d91bbf8ffe7123ad18ae4..6fce74730a9d6cd923207f7239b804b1580cc005 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -366,7 +366,10 @@ impl Consensus { let data_header = self.create_data_header_and_update_mapping(port_info); let sync_header = self.create_sync_header(comp_ctx); - return DataMessage{ data_header, sync_header, content }; + return DataMessage{ + data_header, sync_header, content, + ports: Vec::new() + }; } /// Handles the arrival of a new data message (needs to be called for every diff --git a/src/runtime2/error.rs b/src/runtime2/error.rs index 4982a3230d43bbb8dcfe4f2fc31913b84b98a46b..a222e98581178228c4960516a04e5b21a5dc1e49 100644 --- a/src/runtime2/error.rs +++ b/src/runtime2/error.rs @@ -1,4 +1,4 @@ -use std::fmt::{Write, Debug, Display, Formatter as FmtFormatter, Result as FmtResult}; +use std::fmt::{Debug, Display, Formatter as FmtFormatter, Result as FmtResult}; /// Represents an unrecoverable runtime error that is reported to the user (for /// debugging purposes). Basically a human-readable message with its source diff --git a/src/runtime2/poll/mod.rs b/src/runtime2/poll/mod.rs index 2fc67d21192fb736ee7a12b58581c0f86bc42aaf..86225ca65fe06aead14e60095c4592e216bf3638 100644 --- a/src/runtime2/poll/mod.rs +++ b/src/runtime2/poll/mod.rs @@ -177,6 +177,7 @@ impl PollingThread { } pub(crate) fn run(&mut self) { + use std::io::ErrorKind; use crate::runtime2::communication::Message; const NUM_EVENTS: usize = 256; @@ -191,7 +192,23 @@ impl PollingThread { loop { // Retrieve events first (because the PollingClient will first // register at epoll, and then push a command into the queue). - self.poller.wait(&mut events, EPOLL_DURATION).unwrap(); + loop { + let wait_result = self.poller.wait(&mut events, EPOLL_DURATION); + match wait_result { + Ok(()) => break, + Err(reason) => { + match reason.kind() { + ErrorKind::Interrupted => { + // Happens when we're debugging and set a break- + // point, we want to continue waiting + }, + _ => { + panic!("failed to poll: {}", reason); + } + } + } + } + } // Then handle everything in the command queue. while let Some(command) = self.queue.pop() { diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index bf954b4cdea2ede7f5efb75fef397e823e46f962..7cbce4137cf2d6c188236378ba877df3e45e5461 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -74,7 +74,8 @@ pub(crate) struct RuntimeComp { pub exiting: bool, } -/// Should contain everything that is accessible in a thread-safe manner +/// Should contain everything that is accessible in a thread-safe manner. May +/// NOT contain non-threadsafe fields. // TODO: Do something about the `num_handles` thing. This needs to be a bit more // "foolproof" to lighten the mental burden of using the `num_handles` // variable. @@ -95,12 +96,19 @@ pub(crate) struct CompHandle { } impl CompHandle { - fn new(id: CompId, public: &CompPublic) -> CompHandle { - let handle = CompHandle{ + /// Creates a new component handle and does not increment the reference + /// counter. + fn new_unincremented(id: CompId, public: &CompPublic) -> CompHandle { + return CompHandle{ target: public, id, #[cfg(debug_assertions)] decremented: false, }; + } + + /// Creates a new component handle and increments the reference counter. + fn new(id: CompId, public: &CompPublic) -> CompHandle { + let mut handle = Self::new_unincremented(id, public); handle.increment_users(); return handle; } @@ -232,10 +240,10 @@ impl Runtime { module_name, routine_name, ValueGroup::new_stack(Vec::new()) )?; - let reserved = self.inner.start_create_pdl_component(); + let reserved = self.inner.start_create_component(); let ctx = CompCtx::new(&reserved); let component = Box::new(CompPDL::new(prompt, 0)); - let (key, _) = self.inner.finish_create_pdl_component(reserved, component, ctx, false); + let (key, _) = self.inner.finish_create_component(reserved, component, ctx, false); self.inner.enqueue_work(key); return Ok(()) @@ -284,22 +292,24 @@ impl RuntimeInner { // Creating/destroying components - pub(crate) fn start_create_pdl_component(&self) -> CompReserved { + pub(crate) fn start_create_component(&self) -> CompReserved { self.increment_active_components(); let reservation = self.components.reserve(); return CompReserved{ reservation }; } - pub(crate) fn finish_create_pdl_component( + pub(crate) fn finish_create_component( &self, reserved: CompReserved, component: Box, mut context: CompCtx, initially_sleeping: bool, ) -> (CompKey, &mut RuntimeComp) { + // Construct runtime component let inbox_queue = QueueDynMpsc::new(16); let inbox_producer = inbox_queue.producer(); - let _id = reserved.id(); - context.id = reserved.id(); - let component = RuntimeComp { + let component_id = reserved.id(); + context.id = component_id; + + let mut component = RuntimeComp { public: CompPublic{ sleeping: AtomicBool::new(initially_sleeping), num_handles: AtomicU32::new(1), // the component itself acts like a handle @@ -311,10 +321,17 @@ impl RuntimeInner { exiting: false, }; + // Submit created component into storage. let index = self.components.submit(reserved.reservation, component); - debug_assert_eq!(index, _id.0); + debug_assert_eq!(index, component_id.0); let component = self.components.get_mut(index); + // Bit messy, but here we create the reference of a component to itself, + // the `num_handles` being initialized to `1` above, and add it to the + // component context. + let self_handle = CompHandle::new_unincremented(component_id, &component.public); + component.ctx.add_self_reference(self_handle); + return (CompKey(index), component); } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 4aea4d03e2928f658de0827159de084f48b5e5dc..491e9f411457e73f065101ee360593593b2cdc71 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -89,7 +89,7 @@ impl Scheduler { CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); }, CompScheduling::Exit => { component.component.on_shutdown(&scheduler_ctx); - self.mark_component_as_exiting(&scheduler_ctx, component); + self.mark_component_as_exiting(&scheduler_ctx, comp_key, component); } } } @@ -120,16 +120,14 @@ impl Scheduler { /// Marks the component as exiting by removing the reference it holds to /// itself. Afterward the component will enter "normal" sleeping mode (if it /// has not yet been destroyed) - fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) { + fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, comp_key: CompKey, component: &mut RuntimeComp) { // If we didn't yet decrement our reference count, do so now - let comp_key = unsafe{ component.ctx.id.upgrade() }; - if !component.exiting { component.exiting = true; - let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel); - let new_count = old_count - 1; - if new_count == 0 { + let maybe_comp_key = component.ctx.remove_self_reference(); + if let Some(_comp_key) = maybe_comp_key { + debug_assert_eq!(_comp_key.0, comp_key.0); sched_ctx.runtime.destroy_component(comp_key); return; } diff --git a/src/runtime2/tests/messaging.rs b/src/runtime2/tests/messaging.rs new file mode 100644 index 0000000000000000000000000000000000000000..fc4a0dd65f241980bcc60e866ee705620648a501 --- /dev/null +++ b/src/runtime2/tests/messaging.rs @@ -0,0 +1,119 @@ +use super::*; + + +#[test] +fn test_component_communication() { + let pd = ProtocolDescription::parse(b" + primitive sender(out o, u32 outside_loops, u32 inside_loops) { + u32 outside_index = 0; + while (outside_index < outside_loops) { + u32 inside_index = 0; + sync while (inside_index < inside_loops) { + put(o, inside_index); + inside_index += 1; + } + outside_index += 1; + } + } + + primitive receiver(in i, u32 outside_loops, u32 inside_loops) { + u32 outside_index = 0; + while (outside_index < outside_loops) { + u32 inside_index = 0; + sync while (inside_index < inside_loops) { + auto val = get(i); + while (val != inside_index) {} // infinite loop if incorrect value is received + inside_index += 1; + } + outside_index += 1; + } + } + + composite constructor() { + channel o_orom -> i_orom; + channel o_mrom -> i_mrom; + channel o_ormm -> i_ormm; + channel o_mrmm -> i_mrmm; + + // one round, one message per round + new sender(o_orom, 1, 1); + new receiver(i_orom, 1, 1); + + // multiple rounds, one message per round + new sender(o_mrom, 5, 1); + new receiver(i_mrom, 5, 1); + + // one round, multiple messages per round + new sender(o_ormm, 1, 5); + new receiver(i_ormm, 1, 5); + + // multiple rounds, multiple messages per round + new sender(o_mrmm, 5, 5); + new receiver(i_mrmm, 5, 5); + }").expect("compilation"); + let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap(); + create_component(&rt, "", "constructor", no_args()); +} + +#[test] +fn test_send_to_self() { + compile_and_create_component(" + primitive insane_in_the_membrane() { + channel a -> b; + sync { + put(a, 1); + auto v = get(b); + while (v != 1) {} + } + } + ", "insane_in_the_membrane", no_args()); +} + +#[test] +fn test_intermediate_messenger() { + let pd = ProtocolDescription::parse(b" + primitive receiver(in rx, u32 num) { + auto index = 0; + while (index < num) { + sync { auto v = get(rx); } + index += 1; + } + } + + primitive middleman(in rx, out tx, u32 num) { + auto index = 0; + while (index < num) { + sync { put(tx, get(rx)); } + index += 1; + } + } + + primitive sender(out tx, u32 num) { + auto index = 0; + while (index < num) { + sync put(tx, 1337); + index += 1; + } + } + + composite constructor_template() { + auto num = 0; + channel tx_a -> rx_a; + channel tx_b -> rx_b; + new sender(tx_a, 3); + new middleman(rx_a, tx_b, 3); + new receiver(rx_b, 3); + } + + composite constructor() { + new constructor_template(); + new constructor_template(); + new constructor_template(); + new constructor_template(); + new constructor_template(); + new constructor_template(); + } + ").expect("compilation"); + let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap(); + create_component(&rt, "", "constructor", no_args()); +} diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 6baa362a0c2dd9fc572b89a4c7b005151ed2f6ac..85572c586c717aafef2405f6d35172c9dfe4b791 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -3,10 +3,12 @@ use crate::protocol::eval::*; use crate::runtime2::runtime::*; use crate::runtime2::component::{CompCtx, CompPDL}; +mod messaging; mod error_handling; +mod transfer_ports; const LOG_LEVEL: LogLevel = LogLevel::Debug; -const NUM_THREADS: u32 = 4; +const NUM_THREADS: u32 = 1; pub(crate) fn compile_and_create_component(source: &str, routine_name: &str, args: ValueGroup) { let protocol = ProtocolDescription::parse(source.as_bytes()) @@ -20,10 +22,10 @@ pub(crate) fn create_component(rt: &Runtime, module_name: &str, routine_name: &s let prompt = rt.inner.protocol.new_component( module_name.as_bytes(), routine_name.as_bytes(), args ).expect("create prompt"); - let reserved = rt.inner.start_create_pdl_component(); + let reserved = rt.inner.start_create_component(); let ctx = CompCtx::new(&reserved); let component = Box::new(CompPDL::new(prompt, 0)); - let (key, _) = rt.inner.finish_create_pdl_component(reserved, component, ctx, false); + let (key, _) = rt.inner.finish_create_component(reserved, component, ctx, false); rt.inner.enqueue_work(key); } @@ -44,109 +46,6 @@ fn test_component_creation() { } } -#[test] -fn test_component_communication() { - let pd = ProtocolDescription::parse(b" - primitive sender(out o, u32 outside_loops, u32 inside_loops) { - u32 outside_index = 0; - while (outside_index < outside_loops) { - u32 inside_index = 0; - sync while (inside_index < inside_loops) { - put(o, inside_index); - inside_index += 1; - } - outside_index += 1; - } - } - - primitive receiver(in i, u32 outside_loops, u32 inside_loops) { - u32 outside_index = 0; - while (outside_index < outside_loops) { - u32 inside_index = 0; - sync while (inside_index < inside_loops) { - auto val = get(i); - while (val != inside_index) {} // infinite loop if incorrect value is received - inside_index += 1; - } - outside_index += 1; - } - } - - composite constructor() { - channel o_orom -> i_orom; - channel o_mrom -> i_mrom; - channel o_ormm -> i_ormm; - channel o_mrmm -> i_mrmm; - - // one round, one message per round - new sender(o_orom, 1, 1); - new receiver(i_orom, 1, 1); - - // multiple rounds, one message per round - new sender(o_mrom, 5, 1); - new receiver(i_mrom, 5, 1); - - // one round, multiple messages per round - new sender(o_ormm, 1, 5); - new receiver(i_ormm, 1, 5); - - // multiple rounds, multiple messages per round - new sender(o_mrmm, 5, 5); - new receiver(i_mrmm, 5, 5); - }").expect("compilation"); - let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap(); - create_component(&rt, "", "constructor", no_args()); -} - -#[test] -fn test_intermediate_messenger() { - let pd = ProtocolDescription::parse(b" - primitive receiver(in rx, u32 num) { - auto index = 0; - while (index < num) { - sync { auto v = get(rx); } - index += 1; - } - } - - primitive middleman(in rx, out tx, u32 num) { - auto index = 0; - while (index < num) { - sync { put(tx, get(rx)); } - index += 1; - } - } - - primitive sender(out tx, u32 num) { - auto index = 0; - while (index < num) { - sync put(tx, 1337); - index += 1; - } - } - - composite constructor_template() { - auto num = 0; - channel tx_a -> rx_a; - channel tx_b -> rx_b; - new sender(tx_a, 3); - new middleman(rx_a, tx_b, 3); - new receiver(rx_b, 3); - } - - composite constructor() { - new constructor_template(); - new constructor_template(); - new constructor_template(); - new constructor_template(); - new constructor_template(); - new constructor_template(); - } - ").expect("compilation"); - let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap(); - create_component(&rt, "", "constructor", no_args()); -} - #[test] fn test_simple_select() { let pd = ProtocolDescription::parse(b" diff --git a/src/runtime2/tests/transfer_ports.rs b/src/runtime2/tests/transfer_ports.rs new file mode 100644 index 0000000000000000000000000000000000000000..5fa876e7a88677c0571bc8579df721893da1e865 --- /dev/null +++ b/src/runtime2/tests/transfer_ports.rs @@ -0,0 +1,21 @@ +use super::*; + +#[test] +fn test_transfer_precreated_port_without_using() { + compile_and_create_component(" + primitive port_sender(out> tx) { + channel a -> b; + sync put(tx, b); + } + + primitive port_receiver(in> rx) { + sync auto a = get(rx); + } + + composite constructor() { + channel a -> b; + new port_sender(a); + new port_receiver(b); + } + ", "constructor", no_args()); +} \ No newline at end of file