[PH2][WritePath] Track memory of all the frames returned by a single Dequeue call from StreamDataQueue. (#40619)
Closes #40619
COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/40619 from ac-patel:write2 8f3b0848dd54ac51fa336d7eed9b6686632dbbf3
PiperOrigin-RevId: 803507536
diff --git a/src/core/ext/transport/chttp2/transport/stream_data_queue.h b/src/core/ext/transport/chttp2/transport/stream_data_queue.h
index 981ff86..8ca6cfa 100644
--- a/src/core/ext/transport/chttp2/transport/stream_data_queue.h
+++ b/src/core/ext/transport/chttp2/transport/stream_data_queue.h
@@ -367,6 +367,9 @@
std::vector<Http2Frame> frames;
bool is_writable;
WritableStreams::StreamPriority priority;
+ // Maybe not be extremely accurate but should be good enough for our
+ // purposes.
+ size_t total_bytes_consumed;
};
// TODO(akshitpatel) : [PH2][P4] : Measure the performance of this function
@@ -421,7 +424,8 @@
GRPC_STREAM_DATA_QUEUE_DEBUG << "Stream id: " << stream_id_
<< " writable state changed to "
<< is_writable_;
- return DequeueResult{handle_dequeue.GetFrames(), is_writable_, priority_};
+ return DequeueResult{handle_dequeue.GetFrames(), is_writable_, priority_,
+ handle_dequeue.GetTotalBytesConsumed()};
}
// Returns true if the queue is empty. This function is thread safe.
@@ -500,6 +504,8 @@
return std::move(frames_);
}
+ size_t GetTotalBytesConsumed() const { return total_bytes_consumed_; }
+
private:
inline void MaybeAppendInitialMetadataFrames() {
while (queue_.initial_metadata_disassembler_.HasMoreData()) {
@@ -508,7 +514,7 @@
// TODO(akshitpatel) : [PH2][P2] : I do not think we need this.
// HasMoreData() should be enough.
bool is_end_headers = false;
- frames_.emplace_back(queue_.initial_metadata_disassembler_.GetNextFrame(
+ AppendFrame(queue_.initial_metadata_disassembler_.GetNextFrame(
max_frame_length_, is_end_headers));
}
}
@@ -522,9 +528,8 @@
// TODO(akshitpatel) : [PH2][P2] : I do not think we need this.
// HasMoreData() should be enough.
bool is_end_headers = false;
- frames_.emplace_back(
- queue_.trailing_metadata_disassembler_.GetNextFrame(
- max_frame_length_, is_end_headers));
+ AppendFrame(queue_.trailing_metadata_disassembler_.GetNextFrame(
+ max_frame_length_, is_end_headers));
}
}
@@ -535,9 +540,9 @@
0u);
DCHECK_EQ(queue_.trailing_metadata_disassembler_.GetBufferedLength(),
0u);
- frames_.emplace_back(Http2DataFrame{/*stream_id=*/queue_.stream_id_,
- /*end_stream=*/true,
- /*payload=*/SliceBuffer()});
+ AppendFrame(Http2DataFrame{/*stream_id=*/queue_.stream_id_,
+ /*end_stream=*/true,
+ /*payload=*/SliceBuffer()});
}
}
@@ -553,7 +558,7 @@
GRPC_STREAM_DATA_QUEUE_DEBUG
<< "Appending message frame with length " << frame.payload.Length()
<< "Available tokens: " << fc_tokens_available_;
- frames_.emplace_back(std::move(frame));
+ AppendFrame(std::move(frame));
}
}
@@ -566,11 +571,15 @@
0u);
DCHECK_EQ(queue_.trailing_metadata_disassembler_.GetBufferedLength(),
0u);
- frames_.emplace_back(
- Http2RstStreamFrame{queue_.stream_id_, error_code_});
+ AppendFrame(Http2RstStreamFrame{queue_.stream_id_, error_code_});
}
}
+ inline void AppendFrame(Http2Frame&& frame) {
+ total_bytes_consumed_ += GetFrameMemoryUsage(frame);
+ frames_.emplace_back(std::move(frame));
+ }
+
StreamDataQueue& queue_;
const uint32_t max_frame_length_;
const uint32_t max_fc_tokens_;
@@ -580,6 +589,7 @@
uint32_t error_code_ = static_cast<uint32_t>(Http2ErrorCode::kNoError);
std::vector<Http2Frame> frames_;
HPackCompressor& encoder_;
+ size_t total_bytes_consumed_ = 0u;
};
// Updates the stream priority. Also sets the writable state to true if the