[PH2][WritePath] Track memory of all the frames returned by a single Dequeue call from StreamDataQueue. (#40619)

Closes #40619

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/40619 from ac-patel:write2 8f3b0848dd54ac51fa336d7eed9b6686632dbbf3
PiperOrigin-RevId: 803507536
diff --git a/src/core/ext/transport/chttp2/transport/stream_data_queue.h b/src/core/ext/transport/chttp2/transport/stream_data_queue.h
index 981ff86..8ca6cfa 100644
--- a/src/core/ext/transport/chttp2/transport/stream_data_queue.h
+++ b/src/core/ext/transport/chttp2/transport/stream_data_queue.h
@@ -367,6 +367,9 @@
     std::vector<Http2Frame> frames;
     bool is_writable;
     WritableStreams::StreamPriority priority;
+    // Maybe not be extremely accurate but should be good enough for our
+    // purposes.
+    size_t total_bytes_consumed;
   };
 
   // TODO(akshitpatel) : [PH2][P4] : Measure the performance of this function
@@ -421,7 +424,8 @@
     GRPC_STREAM_DATA_QUEUE_DEBUG << "Stream id: " << stream_id_
                                  << " writable state changed to "
                                  << is_writable_;
-    return DequeueResult{handle_dequeue.GetFrames(), is_writable_, priority_};
+    return DequeueResult{handle_dequeue.GetFrames(), is_writable_, priority_,
+                         handle_dequeue.GetTotalBytesConsumed()};
   }
 
   // Returns true if the queue is empty. This function is thread safe.
@@ -500,6 +504,8 @@
       return std::move(frames_);
     }
 
+    size_t GetTotalBytesConsumed() const { return total_bytes_consumed_; }
+
    private:
     inline void MaybeAppendInitialMetadataFrames() {
       while (queue_.initial_metadata_disassembler_.HasMoreData()) {
@@ -508,7 +514,7 @@
         // TODO(akshitpatel) : [PH2][P2] : I do not think we need this.
         // HasMoreData() should be enough.
         bool is_end_headers = false;
-        frames_.emplace_back(queue_.initial_metadata_disassembler_.GetNextFrame(
+        AppendFrame(queue_.initial_metadata_disassembler_.GetNextFrame(
             max_frame_length_, is_end_headers));
       }
     }
@@ -522,9 +528,8 @@
         // TODO(akshitpatel) : [PH2][P2] : I do not think we need this.
         // HasMoreData() should be enough.
         bool is_end_headers = false;
-        frames_.emplace_back(
-            queue_.trailing_metadata_disassembler_.GetNextFrame(
-                max_frame_length_, is_end_headers));
+        AppendFrame(queue_.trailing_metadata_disassembler_.GetNextFrame(
+            max_frame_length_, is_end_headers));
       }
     }
 
@@ -535,9 +540,9 @@
                   0u);
         DCHECK_EQ(queue_.trailing_metadata_disassembler_.GetBufferedLength(),
                   0u);
-        frames_.emplace_back(Http2DataFrame{/*stream_id=*/queue_.stream_id_,
-                                            /*end_stream=*/true,
-                                            /*payload=*/SliceBuffer()});
+        AppendFrame(Http2DataFrame{/*stream_id=*/queue_.stream_id_,
+                                   /*end_stream=*/true,
+                                   /*payload=*/SliceBuffer()});
       }
     }
 
@@ -553,7 +558,7 @@
         GRPC_STREAM_DATA_QUEUE_DEBUG
             << "Appending message frame with length " << frame.payload.Length()
             << "Available tokens: " << fc_tokens_available_;
-        frames_.emplace_back(std::move(frame));
+        AppendFrame(std::move(frame));
       }
     }
 
@@ -566,11 +571,15 @@
                   0u);
         DCHECK_EQ(queue_.trailing_metadata_disassembler_.GetBufferedLength(),
                   0u);
-        frames_.emplace_back(
-            Http2RstStreamFrame{queue_.stream_id_, error_code_});
+        AppendFrame(Http2RstStreamFrame{queue_.stream_id_, error_code_});
       }
     }
 
+    inline void AppendFrame(Http2Frame&& frame) {
+      total_bytes_consumed_ += GetFrameMemoryUsage(frame);
+      frames_.emplace_back(std::move(frame));
+    }
+
     StreamDataQueue& queue_;
     const uint32_t max_frame_length_;
     const uint32_t max_fc_tokens_;
@@ -580,6 +589,7 @@
     uint32_t error_code_ = static_cast<uint32_t>(Http2ErrorCode::kNoError);
     std::vector<Http2Frame> frames_;
     HPackCompressor& encoder_;
+    size_t total_bytes_consumed_ = 0u;
   };
 
   // Updates the stream priority. Also sets the writable state to true if the