diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 461fc510cb5480691d3578b2a811defe5880ef73..57d89d9ba93a959fd7f7c14ebe9ff3d5f0fa7f0c 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -243,7 +243,7 @@ pub(crate) fn default_send_data_message( let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state == PortState::Closed { + if port_info.state.is_closed() { // Note: normally peer is eventually consistent, but if it has shut down // then we can be sure it is consistent (I think?) return Err(( @@ -312,10 +312,9 @@ pub(crate) fn default_handle_incoming_data_message( // Slot is already full, so if the port was previously opened, it will // now become closed let port_info = comp_ctx.get_port_mut(port_handle); - debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked()); // i.e. not closed, but will go off if more states are added in the future + if port_info.state.is_open() { + port_info.state.set(PortStateFlag::BlockedDueToFullBuffers); - if port_info.state == PortState::Open { - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); let (peer_handle, message) = control.initiate_port_blocking(comp_ctx, port_handle); let peer = comp_ctx.get_peer(peer_handle); @@ -346,9 +345,7 @@ pub(crate) fn default_attempt_get( let port_info = comp_ctx.get_port_mut(port_handle); port_info.last_instruction = target_port_instruction; - - let port_is_closed = port_info.state == PortState::Closed; - if port_is_closed { + if port_info.state.is_closed() { let peer_id = port_info.peer_comp_id; return GetResult::Error(( target_port_instruction, @@ -402,18 +399,17 @@ pub(crate) fn default_handle_received_data_message( debug_assert!(slot.is_none()); // because we've just received from it // Modify last-known location where port instruction was retrieved - let port_info = comp_ctx.get_port_mut(port_handle); + let port_info = comp_ctx.get_port(port_handle); debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller - debug_assert_ne!(port_info.state, PortState::Closed); // checked by caller + debug_assert!(port_info.state.is_open()); // checked by caller // Check if there are any more messages in the backup buffer - let port_info = comp_ctx.get_port(port_handle); for message_index in 0..inbox_backup.len() { let message = &inbox_backup[message_index]; if message.data_header.target_port == targeted_port { // One more message, place it in the slot let message = inbox_backup.remove(message_index); - debug_assert!(port_info.state.is_blocked()); // since we're removing another message from the backup + debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we're removing another message from the backup *slot = Some(message); return Ok(()); @@ -422,8 +418,10 @@ pub(crate) fn default_handle_received_data_message( // Did not have any more messages, so if we were blocked, then we need to // unblock the port now (and inform the peer of this unblocking) - if port_info.state == PortState::BlockedDueToFullBuffers { - comp_ctx.set_port_state(port_handle, PortState::Open); + if port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers) { + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers); + let (peer_handle, message) = control.cancel_port_blocking(comp_ctx, port_handle); let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true); @@ -445,21 +443,20 @@ pub(crate) fn default_handle_control_message( ControlMessageContent::Ack => { default_handle_ack(control, message.id, sched_ctx, comp_ctx); }, - ControlMessageContent::BlockPort(port_id) => { + ControlMessageContent::BlockPort => { // One of our messages was accepted, but the port should be // blocked. - let port_handle = comp_ctx.get_port_handle(port_id); - let port_info = comp_ctx.get_port(port_handle); + let port_to_block = message.target_port_id.unwrap(); + let port_handle = comp_ctx.get_port_handle(port_to_block); + let port_info = comp_ctx.get_port_mut(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state == PortState::Open { - // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); - } + port_info.state.set(PortStateFlag::BlockedDueToFullBuffers); }, ControlMessageContent::ClosePort(content) => { // Request to close the port. We immediately comply and remove // the component handle as well - let port_handle = comp_ctx.get_port_handle(content.port_to_close); + let port_to_close = message.target_port_id.unwrap(); + let port_handle = comp_ctx.get_port_handle(port_to_close); // We're closing the port, so we will always update the peer of the // port (in case of error messages) @@ -467,7 +464,6 @@ pub(crate) fn default_handle_control_message( port_info.peer_comp_id = message.sender_comp_id; port_info.close_at_sync_end = true; // might be redundant (we might set it closed now) - let port_info = comp_ctx.get_port(port_handle); let peer_comp_id = port_info.peer_comp_id; let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); @@ -481,6 +477,7 @@ pub(crate) fn default_handle_control_message( default_handle_ack(control, control_id, sched_ctx, comp_ctx); } else { // Respond to the message + let port_info = comp_ctx.get_port(port_handle); let last_instruction = port_info.last_instruction; let port_has_had_message = port_info.received_message_for_sync; default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); @@ -507,37 +504,43 @@ pub(crate) fn default_handle_control_message( )); } } else { - comp_ctx.set_port_state(port_handle, PortState::Closed); + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.state.set(PortStateFlag::Closed); } } }, - ControlMessageContent::UnblockPort(port_id) => { + ControlMessageContent::UnblockPort => { // We were previously blocked (or already closed) - let port_handle = comp_ctx.get_port_handle(port_id); - let port_info = comp_ctx.get_port(port_handle); + let port_to_unblock = message.target_port_id.unwrap(); + let port_handle = comp_ctx.get_port_handle(port_to_unblock); + let port_info = comp_ctx.get_port_mut(port_handle); + debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state == PortState::BlockedDueToFullBuffers { - default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx); - } + debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers)); + + port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers); + default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx); }, - ControlMessageContent::PortPeerChangedBlock(port_id) => { + ControlMessageContent::PortPeerChangedBlock => { // The peer of our port has just changed. So we are asked to // temporarily block the port (while our original recipient is // potentially rerouting some of the in-flight messages) and // Ack. Then we wait for the `unblock` call. - debug_assert_eq!(message.target_port_id, Some(port_id)); - let port_handle = comp_ctx.get_port_handle(port_id); - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange); + let port_to_change = message.target_port_id.unwrap(); + let port_handle = comp_ctx.get_port_handle(port_to_change); - let port_info = comp_ctx.get_port(port_handle); - let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let port_info = comp_ctx.get_port_mut(port_handle); + let peer_comp_id = port_info.peer_comp_id; + port_info.state.set(PortStateFlag::BlockedDueToPeerChange); + let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); }, ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => { - let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap()); + let port_to_change = message.target_port_id.unwrap(); + let port_handle = comp_ctx.get_port_handle(port_to_change); let port_info = comp_ctx.get_port(port_handle); - debug_assert!(port_info.state == PortState::BlockedDueToPeerChange); + debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToPeerChange)); let old_peer_id = port_info.peer_comp_id; comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false); @@ -545,8 +548,10 @@ pub(crate) fn default_handle_control_message( let port_info = comp_ctx.get_port_mut(port_handle); port_info.peer_comp_id = new_comp_id; port_info.peer_port_id = new_port_id; + + port_info.state.clear(PortStateFlag::BlockedDueToPeerChange); comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None); - default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx); + default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx); } } @@ -615,14 +620,14 @@ pub(crate) fn default_handle_start_exit( // Iterating over ports by index to work around borrowing rules for port_index in 0..comp_ctx.num_ports() { let port = comp_ctx.get_port_by_index_mut(port_index); - if port.state == PortState::Closed || port.close_at_sync_end { + if port.state.is_closed() || port.close_at_sync_end { // Already closed, or in the process of being closed continue; } // Mark as closed let port_id = port.self_id; - port.state = PortState::Closed; + port.state.set(PortStateFlag::Closed); // Notify peer of closing let port_handle = comp_ctx.get_port_handle(port_id); @@ -687,7 +692,7 @@ pub(crate) fn default_handle_sync_decision( for port_index in 0..comp_ctx.num_ports() { let port_info = comp_ctx.get_port_by_index_mut(port_index); if port_info.close_at_sync_end { - port_info.state = PortState::Closed; + port_info.state.set(PortStateFlag::Closed); } } debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); @@ -789,14 +794,13 @@ fn default_send_ack( /// Handles the unblocking of a putter port. In case there is a pending message /// on that port then it will be sent. -fn default_handle_unblock_put( +fn default_handle_recently_unblocked_port( exec_state: &mut CompExecState, consensus: &mut Consensus, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, ) { let port_info = comp_ctx.get_port_mut(port_handle); let port_id = port_info.self_id; - debug_assert!(port_info.state.is_blocked()); - port_info.state = PortState::Open; + debug_assert!(!port_info.state.is_blocked()); // should have been done by the caller if exec_state.is_blocked_on_put(port_id) { // Annotate the message that we're going to send