From 03c2c3f9625026bf50c184e8b26621a7f8ce4758 2022-05-11 11:39:49 From: mh Date: 2022-05-11 11:39:49 Subject: [PATCH] Fix bug involving interaction between transmitting ports and blocking --- diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index e3bbc5105e1884f3a66e3cd2002165d27c8b10bc..782399c46374828d05bb0b6074b2511e7de8a519 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -78,7 +78,8 @@ pub(crate) enum CompMode { BlockedGet, // blocked because we need to receive a message on a particular port BlockedPut, // component is blocked because the port is blocked BlockedSelect, // waiting on message to complete the select statement - BlockedPutPorts,// blocked because we're waiting to send a data message containing ports + BlockedPutPortsAwaitingAcks,// blocked because we're waiting to send a data message containing ports, but first need to receive Acks for the PortPeerChanged messages + BlockedPutPortsReady, // blocked because we're waitingto send a data message containing ports StartExit, // temporary state: if encountered then we start the shutdown process. BusyExit, // temporary state: waiting for Acks for all the closed ports, potentially waiting for sync round to finish Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 @@ -89,7 +90,8 @@ impl CompMode { use CompMode::*; match self { - Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | BlockedPutPorts => true, + Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | + BlockedPutPortsAwaitingAcks | BlockedPutPortsReady => true, NonSync | StartExit | BusyExit | Exit => false, } } @@ -98,7 +100,8 @@ impl CompMode { use CompMode::*; match self { - NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | BlockedPutPorts => false, + NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | + BlockedPutPortsAwaitingAcks | BlockedPutPortsReady => false, StartExit | BusyExit => true, Exit => false, } @@ -169,17 +172,29 @@ impl CompExecState { self.mode_port == port; } - pub(crate) fn set_as_blocked_put(&mut self, port: PortId, value: ValueGroup) { + pub(crate) fn set_as_blocked_put_without_ports(&mut self, port: PortId, value: ValueGroup) { self.mode = CompMode::BlockedPut; self.mode_port = port; self.mode_value = value; } - pub(crate) fn is_blocked_on_put(&self, port: PortId) -> bool { + pub(crate) fn set_as_blocked_put_with_ports(&mut self, port: PortId, value: ValueGroup) { + self.mode = CompMode::BlockedPutPortsAwaitingAcks; + self.mode_port = port; + self.mode_value = value; + } + + pub(crate) fn is_blocked_on_put_without_ports(&self, port: PortId) -> bool { return self.mode == CompMode::BlockedPut && self.mode_port == port; } + + pub(crate) fn is_blocked_on_put_with_ports(&self, port: PortId) -> bool { + return + self.mode == CompMode::BlockedPutPortsReady && + self.mode_port == port; + } } // TODO: Replace when implementing port sending. Should probably be incorporated @@ -245,6 +260,9 @@ pub(crate) fn default_send_data_message( let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); + let mut ports = Vec::new(); + find_ports_in_value_group(&value, &mut ports); + if port_info.state.is_closed() { // Note: normally peer is eventually consistent, but if it has shut down // then we can be sure it is consistent (I think?) @@ -252,31 +270,26 @@ pub(crate) fn default_send_data_message( port_info.last_instruction, format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0) )) + } else if !ports.is_empty() { + prepare_send_message_with_ports( + transmitting_port_id, port_instruction, value, exec_state, + comp_ctx, sched_ctx, control + )?; + + return Ok(CompScheduling::Sleep); } else if port_info.state.is_blocked() { // Port is blocked, so we cannot send - exec_state.set_as_blocked_put(transmitting_port_id, value); + exec_state.set_as_blocked_put_without_ports(transmitting_port_id, value); return Ok(CompScheduling::Sleep); } else { - // Check if there are any ports that are being transmitted - let mut ports = Vec::new(); - find_ports_in_value_group(&value, &mut ports); - if !ports.is_empty() { - prepare_send_message_with_ports( - transmitting_port_id, port_instruction, value, exec_state, - comp_ctx, sched_ctx, control - )?; - - return Ok(CompScheduling::Sleep); - } else { - // Port is not blocked and no ports to transfer: send to the peer - let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); - let peer_info = comp_ctx.get_peer(peer_handle); - let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); - peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); + // Port is not blocked and no ports to transfer: send to the peer + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); + peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); - return Ok(CompScheduling::Immediate); - } + return Ok(CompScheduling::Immediate); } } @@ -588,7 +601,10 @@ pub(crate) fn default_handle_control_message( debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers)); port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers); - default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx); + default_handle_recently_unblocked_port( + exec_state, consensus, port_handle, sched_ctx, comp_ctx, + inbox_main, inbox_backup + ); }, ControlMessageContent::PortPeerChangedBlock => { // The peer of our port has just changed. So we are asked to @@ -616,7 +632,10 @@ pub(crate) fn default_handle_control_message( port_info.state.clear(PortStateFlag::BlockedDueToPeerChange); comp_ctx.change_port_peer(sched_ctx, port_handle, Some(new_comp_id)); - default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx); + default_handle_recently_unblocked_port( + exec_state, consensus, port_handle, sched_ctx, comp_ctx, + inbox_main, inbox_backup + ); } } @@ -876,21 +895,19 @@ fn prepare_send_message_with_ports( // We've set up the protocol, once all the PPC's are blocked we are supposed // to transfer the message to the recipient. So store it temporarily - exec_state.mode = CompMode::BlockedPutPorts; - exec_state.mode_port = sending_port_id; - exec_state.mode_value = value; + exec_state.set_as_blocked_put_with_ports(sending_port_id, value); return Ok(()); } /// Performs the transmission of a data message that contains ports. These were /// all stored in the component's execution state by the -/// `prepare_send_message_with_ports` function. +/// `prepare_send_message_with_ports` function. Port must be ready to send! fn perform_send_message_with_ports( exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, consensus: &mut Consensus, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) { - debug_assert_eq!(exec_state.mode, CompMode::BlockedPutPorts); + debug_assert_eq!(exec_state.mode, CompMode::BlockedPutPortsReady); // Find all ports again let mut transmit_ports = Vec::new(); @@ -898,6 +915,7 @@ fn perform_send_message_with_ports( let port_handle = comp_ctx.get_port_handle(exec_state.mode_port); let port_info = comp_ctx.get_port(port_handle); + debug_assert!(port_info.state.can_send() && !port_info.state.is_blocked()); let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); // Annotate the data message @@ -963,11 +981,16 @@ fn default_handle_ack( debug_assert!(_should_remove.is_none()); }, AckAction::UnblockPutWithPorts => { - // Send the message (containing ports) to the recipient + // Send the message (containing ports) stored in the component + // execution state to the recipient println!("DEBUG: Unblocking put with ports"); - perform_send_message_with_ports(exec_state, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup); - exec_state.mode = CompMode::Sync; - exec_state.mode_port = PortId::new_invalid(); + exec_state.mode = CompMode::BlockedPutPortsReady; + let port_handle = comp_ctx.get_port_handle(exec_state.mode_port); + + default_handle_recently_unblocked_port( + exec_state, consensus, port_handle, sched_ctx, comp_ctx, + inbox_main, inbox_backup + ); }, AckAction::None => {} } @@ -994,16 +1017,26 @@ fn default_send_ack( } /// Handles the unblocking of a putter port. In case there is a pending message -/// on that port then it will be sent. +/// on that port then it will be sent. There are two reasons for calling this +/// function: either a port was blocked (i.e. the Blocked state flag was +/// cleared), or the component is ready to send a message containing ports +/// (stored in the execution state). In this latter case we might still have +/// a blocked port. fn default_handle_recently_unblocked_port( exec_state: &mut CompExecState, consensus: &mut Consensus, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) { let port_info = comp_ctx.get_port_mut(port_handle); let port_id = port_info.self_id; - debug_assert!(!port_info.state.is_blocked()); // should have been done by the caller - if exec_state.is_blocked_on_put(port_id) { + if port_info.state.is_blocked() { + // Port is still blocked. We wait until the next control message where + // we unblock the port. + return; + } + + if exec_state.is_blocked_on_put_without_ports(port_id) { // Return to the regular execution mode exec_state.mode = CompMode::Sync; exec_state.mode_port = PortId::new_invalid(); @@ -1021,6 +1054,16 @@ fn default_handle_recently_unblocked_port( exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state. exec_state.mode_port = PortId::new_invalid(); + } else if exec_state.is_blocked_on_put_with_ports(port_id) { + // Port is not blocked, and we've completed our part of the + // port-transfer protocol. So send the message + perform_send_message_with_ports( + exec_state, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup + ); + + exec_state.mode = CompMode::Sync; + exec_state.mode_port = PortId::new_invalid(); + debug_assert!(exec_state.mode_value.values.is_empty()); } } diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index 23c5bc24fb5e56fab02d3287b93f6d8a81d5b536..cf10c736583f7d97e9f3040e6c5256eae01d2a11 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -125,7 +125,9 @@ impl Component for ComponentTcpClient { sched_ctx.info(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state)); match self.exec_state.mode { - CompMode::BlockedSelect | CompMode::BlockedPutPorts => { + CompMode::BlockedSelect | + CompMode::BlockedPutPortsAwaitingAcks | + CompMode::BlockedPutPortsReady => { // Not possible: we never enter this state unreachable!(); }, diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 72e1a7dd27cf4167fa96d46a68a74763d6e6d111..2227d58f69243c627a582a07643868bfbbd6ea36 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -284,8 +284,9 @@ impl Component for CompPDL { CompMode::NonSync | CompMode::Sync => { // continue and run PDL code }, - CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | - CompMode::BlockedSelect | CompMode::BlockedPutPorts => { + CompMode::SyncEnd | CompMode::BlockedGet | + CompMode::BlockedPut | CompMode::BlockedSelect | + CompMode::BlockedPutPortsAwaitingAcks | CompMode::BlockedPutPortsReady => { return CompScheduling::Sleep; } CompMode::StartExit => return component::default_handle_start_exit( diff --git a/src/runtime2/component/component_random.rs b/src/runtime2/component/component_random.rs index b2f1fc7e1e569543728a739cddca12a4c6a0e766..7c6063bc4d436adb21fdfb9c83290a3957d2e9ed 100644 --- a/src/runtime2/component/component_random.rs +++ b/src/runtime2/component/component_random.rs @@ -62,7 +62,8 @@ impl Component for ComponentRandomU32 { sched_ctx.info(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode)); match self.exec_state.mode { - CompMode::BlockedGet | CompMode::BlockedSelect | CompMode::BlockedPutPorts => { + CompMode::BlockedGet | CompMode::BlockedSelect | + CompMode::BlockedPutPortsAwaitingAcks | CompMode::BlockedPutPortsReady => { // impossible for this component, no input ports and no select // blocks unreachable!();