diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index a0cc811fcc5e0ee8cfbf52a3be015719545f2dbd..f3a14e53bda533ed9cf9c8fcb57c487321bf6225 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -155,18 +155,22 @@ pub(crate) fn create_component( pub(crate) fn default_send_data_message( exec_state: &mut CompExecState, transmitting_port_id: PortId, value: ValueGroup, sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx -) -> CompScheduling { +) -> Result { // @nocommit: Something better than Err(String) debug_assert_eq!(exec_state.mode, CompMode::Sync); - // TODO: Handle closed ports let port_handle = comp_ctx.get_port_handle(transmitting_port_id); let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state.is_blocked() { + + if port_info.state == PortState::Closed { + // Note: normally peer is eventually consistent, but if it has shut down + // then we can be sure it is consistent (I think?) + return Err(format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0)) + } else if port_info.state.is_blocked() { // Port is blocked, so we cannot send exec_state.set_as_blocked_put(transmitting_port_id, value); - return CompScheduling::Sleep; + return Ok(CompScheduling::Sleep); } else { // Port is not blocked, so send to the peer let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); @@ -174,7 +178,7 @@ pub(crate) fn default_send_data_message( let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(annotated_message), true); - return CompScheduling::Immediate; + return Ok(CompScheduling::Immediate); } } @@ -279,7 +283,7 @@ pub(crate) fn default_handle_received_data_message( pub(crate) fn default_handle_control_message( exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus, message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx -) { +) -> Result<(), String> { // @nocommit, use something else than Err(String) match message.content { ControlMessageContent::Ack => { default_handle_ack(control, message.id, sched_ctx, comp_ctx); @@ -295,22 +299,55 @@ pub(crate) fn default_handle_control_message( comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); } }, - ControlMessageContent::ClosePort(port_id) => { + ControlMessageContent::ClosePort(content) => { // Request to close the port. We immediately comply and remove // the component handle as well - let port_handle = comp_ctx.get_port_handle(port_id); - let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id; + let port_handle = comp_ctx.get_port_handle(content.port_id); + let port_info = comp_ctx.get_port_mut(port_handle); + let peer_comp_id = port_info.peer_comp_id; let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); + // We're closing the port, so we will always update the peer of the + // port (in case of error messages) + port_info.peer_comp_id = message.sender_comp_id; + // One exception to sending an `Ack` is if we just closed the // port ourselves, meaning that the `ClosePort` messages got // sent to one another. if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) { + // The two components (sender and this component) are closing + // the channel at the same time. default_handle_ack(control, control_id, sched_ctx, comp_ctx); } else { + // Respond to the message + let last_instruction = port_info.last_instruction; + let port_was_used = last_instruction != PortInstruction::None; default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed + + // Make sure that we've not reached an error condition. Note + // that if this condition is not met, then we don't error out + // now, but we may error out in the next sync block when we + // try to `put`/`get` on the port. This condition makes sure + // that if we have a successful sync round, followed by the peer + // closing the port, that we don't consider the sync round to + // have failed by mistake. + if content.closed_in_sync_round && exec_state.mode.is_in_sync_block() && port_was_used { + let error_message = match last_instruction { + PortInstruction::None => unreachable!(), // port was used + PortInstruction::NoSource => format!( + "Peer component (id:{}) shut down, so operation on port cannot have succeeded", + message.sender_comp_id.0 + ), + PortInstruction::SourceLocation(source_location) => format!( + "Peer component (id:{}) shut down, so this operation cannot have succeeded", + message.sender_comp_id.0 + ), + }; + + return Err(error_message); + } } }, ControlMessageContent::UnblockPort(port_id) => { @@ -351,6 +388,8 @@ pub(crate) fn default_handle_control_message( default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx); } } + + return Ok(()); } /// Handles a component initiating the exiting procedure, and closing all of its