diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index cebb7b67d319eb1fb2106c9ac06857d5a575659f..7be0b8b8ec97bb4981dccfaab90c50de37fcba94 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -176,11 +176,17 @@ pub enum ControlMessageContent { Ack, BlockPort(PortId), UnblockPort(PortId), - ClosePort(PortId), + ClosePort(ControlMessageClosePort), PortPeerChangedBlock(PortId), PortPeerChangedUnblock(PortId, CompId), } +#[derive(Debug)] +pub struct ControlMessageClosePort { + pub port_to_close: PortId, // ID of the receiving port + pub closed_in_sync_round: bool, // needed to ensure correct handling of errors +} + // ----------------------------------------------------------------------------- // Messages (generic) // ----------------------------------------------------------------------------- diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index a0cc811fcc5e0ee8cfbf52a3be015719545f2dbd..f3a14e53bda533ed9cf9c8fcb57c487321bf6225 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -155,18 +155,22 @@ pub(crate) fn create_component( pub(crate) fn default_send_data_message( exec_state: &mut CompExecState, transmitting_port_id: PortId, value: ValueGroup, sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx -) -> CompScheduling { +) -> Result { // @nocommit: Something better than Err(String) debug_assert_eq!(exec_state.mode, CompMode::Sync); - // TODO: Handle closed ports let port_handle = comp_ctx.get_port_handle(transmitting_port_id); let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state.is_blocked() { + + if port_info.state == PortState::Closed { + // Note: normally peer is eventually consistent, but if it has shut down + // then we can be sure it is consistent (I think?) + return Err(format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0)) + } else if port_info.state.is_blocked() { // Port is blocked, so we cannot send exec_state.set_as_blocked_put(transmitting_port_id, value); - return CompScheduling::Sleep; + return Ok(CompScheduling::Sleep); } else { // Port is not blocked, so send to the peer let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); @@ -174,7 +178,7 @@ pub(crate) fn default_send_data_message( let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(annotated_message), true); - return CompScheduling::Immediate; + return Ok(CompScheduling::Immediate); } } @@ -279,7 +283,7 @@ pub(crate) fn default_handle_received_data_message( pub(crate) fn default_handle_control_message( exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus, message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx -) { +) -> Result<(), String> { // @nocommit, use something else than Err(String) match message.content { ControlMessageContent::Ack => { default_handle_ack(control, message.id, sched_ctx, comp_ctx); @@ -295,22 +299,55 @@ pub(crate) fn default_handle_control_message( comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); } }, - ControlMessageContent::ClosePort(port_id) => { + ControlMessageContent::ClosePort(content) => { // Request to close the port. We immediately comply and remove // the component handle as well - let port_handle = comp_ctx.get_port_handle(port_id); - let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id; + let port_handle = comp_ctx.get_port_handle(content.port_id); + let port_info = comp_ctx.get_port_mut(port_handle); + let peer_comp_id = port_info.peer_comp_id; let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); + // We're closing the port, so we will always update the peer of the + // port (in case of error messages) + port_info.peer_comp_id = message.sender_comp_id; + // One exception to sending an `Ack` is if we just closed the // port ourselves, meaning that the `ClosePort` messages got // sent to one another. if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) { + // The two components (sender and this component) are closing + // the channel at the same time. default_handle_ack(control, control_id, sched_ctx, comp_ctx); } else { + // Respond to the message + let last_instruction = port_info.last_instruction; + let port_was_used = last_instruction != PortInstruction::None; default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed + + // Make sure that we've not reached an error condition. Note + // that if this condition is not met, then we don't error out + // now, but we may error out in the next sync block when we + // try to `put`/`get` on the port. This condition makes sure + // that if we have a successful sync round, followed by the peer + // closing the port, that we don't consider the sync round to + // have failed by mistake. + if content.closed_in_sync_round && exec_state.mode.is_in_sync_block() && port_was_used { + let error_message = match last_instruction { + PortInstruction::None => unreachable!(), // port was used + PortInstruction::NoSource => format!( + "Peer component (id:{}) shut down, so operation on port cannot have succeeded", + message.sender_comp_id.0 + ), + PortInstruction::SourceLocation(source_location) => format!( + "Peer component (id:{}) shut down, so this operation cannot have succeeded", + message.sender_comp_id.0 + ), + }; + + return Err(error_message); + } } }, ControlMessageContent::UnblockPort(port_id) => { @@ -351,6 +388,8 @@ pub(crate) fn default_handle_control_message( default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx); } } + + return Ok(()); } /// Handles a component initiating the exiting procedure, and closing all of its diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs index f419c322f5e4d68297549ccc137e412065c5c568..fc49944ad2baeff421181bece2c7b2567731d203 100644 --- a/src/runtime2/component/component_context.rs +++ b/src/runtime2/component/component_context.rs @@ -2,6 +2,16 @@ use crate::runtime2::scheduler::*; use crate::runtime2::runtime::*; use crate::runtime2::communication::*; +use crate::protocol::ExpressionId; + +/// Helper struct to remember when the last operation on the port took place +#[derive(Debug, PartialEq, Copy, Clone)] +pub enum PortInstruction { + None, + NoSource, + SourceLocation(ExpressionId), +} + #[derive(Debug)] pub struct Port { pub self_id: PortId, @@ -9,6 +19,7 @@ pub struct Port { pub peer_port_id: PortId, // eventually consistent pub kind: PortKind, pub state: PortState, + pub last_instruction: PortInstruction, #[cfg(debug_assertions)] pub(crate) associated_with_peer: bool, } @@ -58,6 +69,7 @@ impl CompCtx { kind: PortKind::Putter, state: PortState::Open, peer_comp_id: self.id, + last_instruction: PortInstruction::None, #[cfg(debug_assertions)] associated_with_peer: false, }); self.ports.push(Port{ @@ -66,6 +78,7 @@ impl CompCtx { kind: PortKind::Getter, state: PortState::Open, peer_comp_id: self.id, + last_instruction: PortInstruction::None, #[cfg(debug_assertions)] associated_with_peer: false, }); @@ -77,6 +90,7 @@ impl CompCtx { let self_id = PortId(self.take_port_id()); self.ports.push(Port{ self_id, peer_comp_id, peer_port_id, kind, state, + last_instruction: PortInstruction::None, #[cfg(debug_assertions)] associated_with_peer: false, }); return LocalPortHandle(self_id); diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index f9f3ab3f1df69fe514ced318d8eb388adec96ea9..881a7f6fc3400aded9a2d379dc1f08f2f2212613 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -632,7 +632,7 @@ impl Consensus { } fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) { - debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader + debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader, // TODO: @NoDirectHandle let mut leader_info = sched_ctx.runtime.get_component_public(self.highest_id); leader_info.send_message(&sched_ctx.runtime, message, true); let should_remove = leader_info.decrement_users();