diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 7e450523700cc9aebe4153b98da061478f4cc1c5..94a11291b2605b9209da0974ff3a32044e0a32b0 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -4,7 +4,6 @@ use crate::protocol::eval::{Prompt, EvalError, ValueGroup, Value, ValueId, PortI use crate::protocol::*; use crate::runtime2::*; use crate::runtime2::communication::*; -use crate::runtime2::component::component_pdl::find_ports_in_value_group; use super::{CompCtx, CompPDL, CompId}; use super::component_context::*; @@ -234,7 +233,8 @@ pub(crate) fn create_component( pub(crate) fn default_send_data_message( exec_state: &mut CompExecState, transmitting_port_id: PortId, port_instruction: PortInstruction, value: ValueGroup, - sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx + sched_ctx: &SchedulerCtx, consensus: &mut Consensus, + control: &mut ControlLayer, comp_ctx: &mut CompCtx ) -> Result { debug_assert_eq!(exec_state.mode, CompMode::Sync); @@ -262,33 +262,21 @@ pub(crate) fn default_send_data_message( let mut ports = Vec::new(); find_ports_in_value_group(&value, &mut ports); if !ports.is_empty() { + prepare_send_message_with_ports( + transmitting_port_id, port_instruction, value, exec_state, + comp_ctx, sched_ctx, control + )?; + return Ok(CompScheduling::Sleep); + } else { + // Port is not blocked and no ports to transfer: send to the peer + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); + peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); - for (value_location, port_id) in ports { - let transmitted_port_handle = comp_ctx.get_port_handle(port_id); - let transmitted_port = comp_ctx.get_port(transmitted_port_handle); - - if transmitted_port.state.is_set(PortStateFlag::Transmitted) { - // Note: We could also attach where the port has been - // transferred - return Err(( - port_info.last_instruction, - String::from("Cannot send this message, as it contains a previously transmitted port") - )); - } - - // Prepare ack for PPC - // Prepare PPC message - } + return Ok(CompScheduling::Immediate); } - - // Port is not blocked, so send to the peer - let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); - let peer_info = comp_ctx.get_peer(peer_handle); - let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); - peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); - - return Ok(CompScheduling::Immediate); } } @@ -416,21 +404,20 @@ pub(crate) fn default_attempt_get( /// of full buffers (hence, will use the control layer to make sure the peer /// will become unblocked). pub(crate) fn default_handle_received_data_message( - targeted_port: PortId, port_instruction: PortInstruction, message: &mut DataMessage, + targeted_port: PortId, _port_instruction: PortInstruction, message: &mut DataMessage, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer ) -> Result<(), (PortInstruction, String)> { let port_handle = comp_ctx.get_port_handle(targeted_port); let port_index = comp_ctx.get_port_index(port_handle); - let slot = &mut inbox_main[port_index]; - debug_assert!(slot.is_none()); // because we've just received from it + debug_assert!(inbox_main[port_index].is_none()); // because we've just received from it // If we received any ports, add them to the port tracking and inbox struct. // Then notify the peers that they can continue sending to this port, but // now at a new address. for received_port in &mut message.ports { // Transfer messages to main/backup inbox - let new_inbox_index = inbox_main.len(); + let _new_inbox_index = inbox_main.len(); if !received_port.messages.is_empty() { inbox_main.push(Some(received_port.messages.remove(0))); } @@ -443,11 +430,12 @@ pub(crate) fn default_handle_received_data_message( received_port.peer_comp, received_port.peer_port, received_port.kind, new_port_state ); + debug_assert_eq!(_new_inbox_index, comp_ctx.get_port_index(new_port_handle)); + comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(received_port.peer_comp)); let new_port = comp_ctx.get_port(new_port_handle); - comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(new_port.peer_comp_id)); // Replace all references to the port in the received message - for message_location in received_port.locations { + for message_location in received_port.locations.iter().copied() { let value = match message_location { ValueId::Heap(heap_pos, heap_index) => &mut message.content.regions[heap_pos as usize][heap_index as usize], ValueId::Stack(stack_index) => &mut message.content.values[stack_index as usize], @@ -474,7 +462,7 @@ pub(crate) fn default_handle_received_data_message( id: ControlId::new_invalid(), sender_comp_id: comp_ctx.id, target_port_id: Some(new_port.peer_port_id), - content: ControlMessageContent::PortPeerChangedUnblock(new_port.id, comp_ctx.id) + content: ControlMessageContent::PortPeerChangedUnblock(new_port.self_id, comp_ctx.id) }), true); } @@ -490,7 +478,7 @@ pub(crate) fn default_handle_received_data_message( // One more message, place it in the slot let message = inbox_backup.remove(message_index); debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we're removing another message from the backup - *slot = Some(message); + inbox_main[port_index] = Some(message); return Ok(()); } @@ -522,7 +510,7 @@ pub(crate) fn default_handle_control_message( ) -> Result<(), (PortInstruction, String)> { match message.content { ControlMessageContent::Ack => { - default_handle_ack(exec_state, control, message.id, sched_ctx, comp_ctx, inbox_main, inbox_backup); + default_handle_ack(exec_state, control, message.id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup); }, ControlMessageContent::BlockPort => { // One of our messages was accepted, but the port should be @@ -555,7 +543,7 @@ pub(crate) fn default_handle_control_message( // The two components (sender and this component) are closing // the channel at the same time. So we don't care about the // content of the `ClosePort` message. - default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, inbox_main, inbox_backup); + default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup); } else { // Respond to the message let port_info = comp_ctx.get_port(port_handle); @@ -622,7 +610,6 @@ pub(crate) fn default_handle_control_message( let port_handle = comp_ctx.get_port_handle(port_to_change); let port_info = comp_ctx.get_port(port_handle); debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToPeerChange)); - let old_peer_id = port_info.peer_comp_id; let port_info = comp_ctx.get_port_mut(port_handle); port_info.peer_port_id = new_port_id; @@ -772,6 +759,7 @@ pub(crate) fn default_handle_sync_decision( if port_info.close_at_sync_end { port_info.state.set(PortStateFlag::Closed); } + port_info.state.clear(PortStateFlag::Received); } debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); exec_state.mode = CompMode::NonSync; @@ -917,8 +905,8 @@ fn perform_send_message_with_ports( // And further enhance the message by adding data about the ports that are // being transferred - for (port_locations, port_id) in transmit_ports { - let transmit_port_handle = comp_ctx.get_port_handle(port_id); + for (port_locations, transmit_port_id) in transmit_ports { + let transmit_port_handle = comp_ctx.get_port_handle(transmit_port_id); let transmit_port_info = comp_ctx.get_port(transmit_port_handle); let transmit_messages = take_port_messages(comp_ctx, transmit_port_id, inbox_main, inbox_backup); @@ -945,8 +933,8 @@ fn perform_send_message_with_ports( /// Handles an `Ack` for the control layer. fn default_handle_ack( exec_state: &mut CompExecState, control: &mut ControlLayer, control_id: ControlId, - sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, inbox_main: &mut InboxMain, - inbox_backup: &mut InboxBackup + sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) { // Since an `Ack` may cause another one, handle them in a loop let mut to_ack = control_id; @@ -1084,7 +1072,7 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut En // Clear the ports, then scan all the available values ports.clear(); - for (value_index, value) in &value_group.values.iter().enumerate() { + for (value_index, value) in value_group.values.iter().enumerate() { find_port_in_value(value_group, value, ValueId::Stack(value_index as u32), ports); } }