blob: a74b813c26cf7e5a13af93f6109468f6bf1bd50b [file] [log] [blame]
// Copyright 2021 gRPC authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <atomic>
#include <cstddef>
#include <limits>
#include <memory>
#include <string>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/memory_request.h>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/poll.h"
namespace grpc_core {
class BasicMemoryQuota;
class MemoryQuota;
using grpc_event_engine::experimental::MemoryRequest;
// Pull in impl under a different name to keep the gRPC/EventEngine separation
// clear.
using EventEngineMemoryAllocatorImpl =
using grpc_event_engine::experimental::MemoryAllocator;
template <typename T>
using Vector = grpc_event_engine::experimental::Vector<T>;
// Reclamation passes.
// When memory is tight, we start trying to claim some back from memory
// reclaimers. We do this in multiple passes: if there is a less destructive
// operation available, we do that, otherwise we do something more destructive.
enum class ReclamationPass {
// Non-empty reclamation ought to take index 0, but to simplify API we don't
// expose that publicly (it's an internal detail), and hence index zero is
// here unnamed.
// Benign reclamation is intended for reclamation steps that are not
// observable outside of gRPC (besides maybe causing an increase in CPU
// usage).
// Examples of such reclamation would be resizing buffers to fit the current
// load needs, rather than whatever was the peak usage requirement.
kBenign = 1,
// Idle reclamation is intended for reclamation steps that are observable
// outside of gRPC, but do not cause application work to be lost.
// Examples of such reclamation would be dropping channels that are not being
// used.
kIdle = 2,
// Destructive reclamation is our last resort, and is these reclamations are
// allowed to drop work - such as cancelling in flight requests.
kDestructive = 3,
static constexpr size_t kNumReclamationPasses = 4;
static constexpr size_t kMaxQuotaBufferSize = 1024 * 1024;
// For each reclamation function run we construct a ReclamationSweep.
// When this object is finally destroyed (it may be moved several times first),
// then that reclamation is complete and we may continue the reclamation loop.
class ReclamationSweep {
ReclamationSweep() = default;
ReclamationSweep(std::shared_ptr<BasicMemoryQuota> memory_quota,
uint64_t sweep_token, Waker waker)
: memory_quota_(std::move(memory_quota)),
waker_(std::move(waker)) {}
ReclamationSweep(const ReclamationSweep&) = delete;
ReclamationSweep& operator=(const ReclamationSweep&) = delete;
ReclamationSweep(ReclamationSweep&&) = default;
ReclamationSweep& operator=(ReclamationSweep&&) = default;
// Has enough work been done that we would not be called upon again
// immediately to do reclamation work if we stopped and requeued. Reclaimers
// with a variable amount of work to do can use this to ascertain when they
// can stop more efficiently than going through the reclaimer queue once per
// work item.
bool IsSufficient() const;
// Explicit finish for users that wish to write it.
// Just destroying the object is enough, but sometimes the additional
// explicitness is warranted.
void Finish() {
[](ReclamationSweep) {}(std::move(*this));
std::shared_ptr<BasicMemoryQuota> memory_quota_;
uint64_t sweep_token_;
Waker waker_;
class ReclaimerQueue {
struct QueuedNode;
struct State;
class Handle : public InternallyRefCounted<Handle> {
Handle() = default;
template <typename F>
explicit Handle(F reclaimer, std::shared_ptr<State> state)
: sweep_(new SweepFn<F>(std::move(reclaimer), std::move(state))) {}
~Handle() override {
GPR_DEBUG_ASSERT(sweep_.load(std::memory_order_relaxed) == nullptr);
Handle(const Handle&) = delete;
Handle& operator=(const Handle&) = delete;
void Orphan() final;
void Run(ReclamationSweep reclamation_sweep);
bool Requeue(ReclaimerQueue* new_queue);
friend class ReclaimerQueue;
using InternallyRefCounted<Handle>::Ref;
class Sweep {
virtual void RunAndDelete(absl::optional<ReclamationSweep> sweep) = 0;
explicit Sweep(std::shared_ptr<State> state) : state_(std::move(state)) {}
~Sweep() = default;
void MarkCancelled();
std::shared_ptr<State> state_;
template <typename F>
class SweepFn final : public Sweep {
explicit SweepFn(F&& f, std::shared_ptr<State> state)
: Sweep(std::move(state)), f_(std::move(f)) {}
void RunAndDelete(absl::optional<ReclamationSweep> sweep) override {
if (!sweep.has_value()) MarkCancelled();
delete this;
F f_;
std::atomic<Sweep*> sweep_{nullptr};
ReclaimerQueue(const ReclaimerQueue&) = delete;
ReclaimerQueue& operator=(const ReclaimerQueue&) = delete;
// Insert a new element at the back of the queue.
// If there is already an element from allocator at *index, then it is
// replaced with the new reclaimer and *index is unchanged. If there is not,
// then *index is set to the index of the newly queued entry.
// Associates the reclamation function with an allocator, and keeps that
// allocator alive, so that we can use the pointer as an ABA guard.
template <typename F>
GRPC_MUST_USE_RESULT OrphanablePtr<Handle> Insert(F reclaimer) {
auto p = MakeOrphanable<Handle>(std::move(reclaimer), state_);
return p;
// Poll to see if an entry is available: returns Pending if not, or the
// removed reclamation function if so.
Poll<RefCountedPtr<Handle>> PollNext();
// This callable is the promise backing Next - it resolves when there is an
// entry available. This really just redirects to calling PollNext().
class NextPromise {
explicit NextPromise(ReclaimerQueue* queue) : queue_(queue) {}
Poll<RefCountedPtr<Handle>> operator()() { return queue_->PollNext(); }
// Borrowed ReclaimerQueue backing this promise.
ReclaimerQueue* queue_;
GRPC_MUST_USE_RESULT NextPromise Next() { return NextPromise(this); }
void Enqueue(RefCountedPtr<Handle> handle);
std::shared_ptr<State> state_;
class BasicMemoryQuota final
: public std::enable_shared_from_this<BasicMemoryQuota> {
explicit BasicMemoryQuota(std::string name) : name_(std::move(name)) {}
// Start the reclamation activity.
void Start();
// Stop the reclamation activity.
// Until reclamation is stopped, it's possible that circular references to the
// BasicMemoryQuota remain. i.e. to guarantee deletion, a singular owning
// object should call BasicMemoryQuota::Stop().
void Stop();
// Resize the quota to new_size.
void SetSize(size_t new_size);
// Forcefully take some memory from the quota, potentially entering
// overcommit.
void Take(size_t amount);
// Finish reclamation pass.
void FinishReclamation(uint64_t token, Waker waker);
// Return some memory to the quota.
void Return(size_t amount);
// Instantaneous memory pressure approximation.
std::pair<double, size_t>
InstantaneousPressureAndMaxRecommendedAllocationSize() const;
// Get a reclamation queue
ReclaimerQueue* reclaimer_queue(size_t i) { return &reclaimers_[i]; }
// The name of this quota
absl::string_view name() const { return name_; }
friend class ReclamationSweep;
class WaitForSweepPromise;
static constexpr intptr_t kInitialSize = std::numeric_limits<intptr_t>::max();
// The amount of memory that's free in this quota.
// We use intptr_t as a reasonable proxy for ssize_t that's portable.
// We allow arbitrary overcommit and so this must allow negative values.
std::atomic<intptr_t> free_bytes_{kInitialSize};
// The total number of bytes in this quota.
std::atomic<size_t> quota_size_{kInitialSize};
// Reclaimer queues.
ReclaimerQueue reclaimers_[kNumReclamationPasses];
// The reclaimer activity consumes reclaimers whenever we are in overcommit to
// try and get back under memory limits.
ActivityPtr reclaimer_activity_;
// Each time we do a reclamation sweep, we increment this counter and give it
// to the sweep in question. In this way, should we choose to cancel a sweep
// we can do so and not get confused when the sweep reports back that it's
// completed.
// We also increment this counter on completion of a sweep, as an indicator
// that the wait has ended.
std::atomic<uint64_t> reclamation_counter_{0};
// The name of this quota - used for debugging/tracing/etc..
std::string name_;
// MemoryAllocatorImpl grants the owner the ability to allocate memory from an
// underlying resource quota.
class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl {
explicit GrpcMemoryAllocatorImpl(
std::shared_ptr<BasicMemoryQuota> memory_quota, std::string name);
~GrpcMemoryAllocatorImpl() override;
// Rebind - Swaps the underlying quota for this allocator, taking care to
// make sure memory allocated is moved to allocations against the new quota.
void Rebind(std::shared_ptr<BasicMemoryQuota> memory_quota)
// Reserve bytes from the quota.
// If we enter overcommit, reclamation will begin concurrently.
// Returns the number of bytes reserved.
size_t Reserve(MemoryRequest request) override;
// Release some bytes that were previously reserved.
void Release(size_t n) override {
// Add the released memory to our free bytes counter... if this increases
// from 0 to non-zero, then we have more to do, otherwise, we're actually
// done.
size_t prev_free = free_bytes_.fetch_add(n, std::memory_order_release);
if (prev_free + n > kMaxQuotaBufferSize) {
// Try to immediately return some free'ed memory back to the total quota.
if (prev_free != 0) return;
// Post a reclamation function.
template <typename F>
void PostReclaimer(ReclamationPass pass, F fn) {
MutexLock lock(&memory_quota_mu_);
InsertReclaimer(static_cast<size_t>(pass), std::move(fn));
// Shutdown the allocator.
void Shutdown() override;
// Read the instantaneous memory pressure
double InstantaneousPressure() const {
MutexLock lock(&memory_quota_mu_);
return memory_quota_->InstantaneousPressureAndMaxRecommendedAllocationSize()
// Name of this allocator
absl::string_view name() const { return name_; }
// Primitive reservation function.
absl::optional<size_t> TryReserve(MemoryRequest request) GRPC_MUST_USE_RESULT;
// This function may be invoked during a memory release operation. If the
// total free_bytes in this allocator/local cache exceeds
// kMaxQuotaBufferSize / 2, donate the excess free_bytes in this cache back
// to the total quota immediately. This helps prevent free bytes in any
// particular allocator from growing too large.
void MaybeDonateBack();
// Replenish bytes from the quota, without blocking, possibly entering
// overcommit.
void Replenish() ABSL_LOCKS_EXCLUDED(memory_quota_mu_);
// If we have not already, register a reclamation function against the quota
// to sweep any free memory back to that quota.
void MaybeRegisterReclaimer() ABSL_LOCKS_EXCLUDED(memory_quota_mu_);
void MaybeRegisterReclaimerLocked()
template <typename F>
void InsertReclaimer(size_t pass, F fn)
reclamation_handles_[pass] =
// Amount of memory this allocator has cached for its own use: to avoid quota
// contention, each MemoryAllocator can keep some memory in addition to what
// it is immediately using, and the quota can pull it back under memory
// pressure.
std::atomic<size_t> free_bytes_{0};
// Mutex guarding the backing resource quota.
mutable Mutex memory_quota_mu_;
// Backing resource quota.
std::shared_ptr<BasicMemoryQuota> memory_quota_
// Amount of memory taken from the quota by this allocator.
size_t taken_bytes_ ABSL_GUARDED_BY(memory_quota_mu_) =
bool shutdown_ ABSL_GUARDED_BY(memory_quota_mu_) = false;
bool registered_reclaimer_ ABSL_GUARDED_BY(memory_quota_mu_) = false;
// Indices into the various reclaimer queues, used so that we can cancel
// reclamation should we shutdown or get rebound.
reclamation_handles_[kNumReclamationPasses] ABSL_GUARDED_BY(
// Name of this allocator.
std::string name_;
// MemoryOwner is an enhanced MemoryAllocator that can also reclaim memory, and
// be rebound to a different memory quota.
// Different modules should not share a MemoryOwner between themselves, instead
// each module that requires a MemoryOwner should create one from a resource
// quota. This is because the MemoryOwner reclaimers are tied to the
// MemoryOwner's lifetime, and are not queryable, so passing a MemoryOwner to a
// new owning module means that module cannot reason about which reclaimers are
// active, nor what they might do.
class MemoryOwner final : public MemoryAllocator {
MemoryOwner() = default;
explicit MemoryOwner(std::shared_ptr<GrpcMemoryAllocatorImpl> allocator)
: MemoryAllocator(std::move(allocator)) {}
// Post a reclaimer for some reclamation pass.
template <typename F>
void PostReclaimer(ReclamationPass pass, F fn) {
impl()->PostReclaimer(pass, std::move(fn));
// Rebind to a different quota.
void Rebind(MemoryQuota* quota);
// Instantaneous memory pressure in the underlying quota.
double InstantaneousPressure() const {
return impl()->InstantaneousPressure();
template <typename T, typename... Args>
OrphanablePtr<T> MakeOrphanable(Args&&... args) {
return OrphanablePtr<T>(New<T>(std::forward<Args>(args)...));
// Name of this object
absl::string_view name() const { return impl()->name(); }
// Is this object valid (ie has not been moved out of or reset)
bool is_valid() const { return impl() != nullptr; }
const GrpcMemoryAllocatorImpl* impl() const {
return static_cast<const GrpcMemoryAllocatorImpl*>(get_internal_impl_ptr());
GrpcMemoryAllocatorImpl* impl() {
return static_cast<GrpcMemoryAllocatorImpl*>(get_internal_impl_ptr());
// MemoryQuota tracks the amount of memory available as part of a ResourceQuota.
class MemoryQuota final
: public grpc_event_engine::experimental::MemoryAllocatorFactory {
explicit MemoryQuota(std::string name)
: memory_quota_(std::make_shared<BasicMemoryQuota>(std::move(name))) {
~MemoryQuota() override {
if (memory_quota_ != nullptr) memory_quota_->Stop();
MemoryQuota(const MemoryQuota&) = delete;
MemoryQuota& operator=(const MemoryQuota&) = delete;
MemoryQuota(MemoryQuota&&) = default;
MemoryQuota& operator=(MemoryQuota&&) = default;
MemoryAllocator CreateMemoryAllocator(absl::string_view name) override;
MemoryOwner CreateMemoryOwner(absl::string_view name);
// Resize the quota to new_size.
void SetSize(size_t new_size) { memory_quota_->SetSize(new_size); }
// Return true if the instantaneous memory pressure is high.
bool IsMemoryPressureHigh() const {
static constexpr double kMemoryPressureHighThreshold = 0.9;
return memory_quota_->InstantaneousPressureAndMaxRecommendedAllocationSize()
.first > kMemoryPressureHighThreshold;
friend class MemoryOwner;
std::shared_ptr<BasicMemoryQuota> memory_quota_;
using MemoryQuotaRefPtr = std::shared_ptr<MemoryQuota>;
inline MemoryQuotaRefPtr MakeMemoryQuota(std::string name) {
return std::make_shared<MemoryQuota>(std::move(name));
} // namespace grpc_core