blob: 710ff1820e5cf18e4486cddcd5c28d81e5f2575b [file] [log] [blame]
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/resource_quota/memory_quota.h"
#include <inttypes.h>
#include <algorithm>
#include <atomic>
#include <tuple>
#include <type_traits>
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/utility/utility.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/mpscq.h"
#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/race.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/resource_quota/trace.h"
namespace grpc_core {
// Maximum number of bytes an allocator will request from a quota in one step.
// Larger allocations than this will require multiple allocation requests.
static constexpr size_t kMaxReplenishBytes = 1024 * 1024;
// Minimum number of bytes an allocator will request from a quota in one step.
static constexpr size_t kMinReplenishBytes = 4096;
//
// Reclaimer
//
ReclamationSweep::~ReclamationSweep() {
if (memory_quota_ != nullptr) {
memory_quota_->FinishReclamation(sweep_token_, std::move(waker_));
}
}
//
// ReclaimerQueue
//
struct ReclaimerQueue::QueuedNode
: public MultiProducerSingleConsumerQueue::Node {
explicit QueuedNode(RefCountedPtr<Handle> reclaimer_handle)
: reclaimer_handle(std::move(reclaimer_handle)) {}
RefCountedPtr<Handle> reclaimer_handle;
};
struct ReclaimerQueue::State {
Mutex reader_mu;
MultiProducerSingleConsumerQueue queue; // reader_mu must be held to pop
Waker waker ABSL_GUARDED_BY(reader_mu);
~State() {
bool empty = false;
do {
delete static_cast<QueuedNode*>(queue.PopAndCheckEnd(&empty));
} while (!empty);
}
};
void ReclaimerQueue::Handle::Orphan() {
if (auto* sweep = sweep_.exchange(nullptr, std::memory_order_acq_rel)) {
sweep->RunAndDelete(absl::nullopt);
}
Unref();
}
void ReclaimerQueue::Handle::Run(ReclamationSweep reclamation_sweep) {
if (auto* sweep = sweep_.exchange(nullptr, std::memory_order_acq_rel)) {
sweep->RunAndDelete(std::move(reclamation_sweep));
}
}
bool ReclaimerQueue::Handle::Requeue(ReclaimerQueue* new_queue) {
if (sweep_.load(std::memory_order_relaxed)) {
new_queue->Enqueue(Ref());
return true;
} else {
return false;
}
}
void ReclaimerQueue::Handle::Sweep::MarkCancelled() {
// When we cancel a reclaimer we rotate the elements of the queue once -
// taking one non-cancelled node from the start, and placing it on the end.
// This ensures that we don't suffer from head of line blocking whereby a
// non-cancelled reclaimer at the head of the queue, in the absence of memory
// pressure, prevents the remainder of the queue from being cleaned up.
MutexLock lock(&state_->reader_mu);
while (true) {
bool empty = false;
std::unique_ptr<QueuedNode> node(
static_cast<QueuedNode*>(state_->queue.PopAndCheckEnd(&empty)));
if (node == nullptr) break;
if (node->reclaimer_handle->sweep_.load(std::memory_order_relaxed) !=
nullptr) {
state_->queue.Push(node.release());
break;
}
}
}
ReclaimerQueue::ReclaimerQueue() : state_(std::make_shared<State>()) {}
ReclaimerQueue::~ReclaimerQueue() = default;
void ReclaimerQueue::Enqueue(RefCountedPtr<Handle> handle) {
if (state_->queue.Push(new QueuedNode(std::move(handle)))) {
MutexLock lock(&state_->reader_mu);
state_->waker.Wakeup();
}
}
Poll<RefCountedPtr<ReclaimerQueue::Handle>> ReclaimerQueue::PollNext() {
MutexLock lock(&state_->reader_mu);
bool empty = false;
// Try to pull from the queue.
std::unique_ptr<QueuedNode> node(
static_cast<QueuedNode*>(state_->queue.PopAndCheckEnd(&empty)));
// If we get something, great.
if (node != nullptr) return std::move(node->reclaimer_handle);
if (!empty) {
// If we don't, but the queue is probably not empty, schedule an immediate
// repoll.
Activity::current()->ForceImmediateRepoll();
} else {
// Otherwise, schedule a wakeup for whenever something is pushed.
state_->waker = Activity::current()->MakeNonOwningWaker();
}
return Pending{};
}
//
// GrpcMemoryAllocatorImpl
//
GrpcMemoryAllocatorImpl::GrpcMemoryAllocatorImpl(
std::shared_ptr<BasicMemoryQuota> memory_quota, std::string name)
: memory_quota_(memory_quota), name_(std::move(name)) {
memory_quota_->Take(taken_bytes_);
}
GrpcMemoryAllocatorImpl::~GrpcMemoryAllocatorImpl() {
GPR_ASSERT(free_bytes_.load(std::memory_order_acquire) +
sizeof(GrpcMemoryAllocatorImpl) ==
taken_bytes_);
memory_quota_->Return(taken_bytes_);
}
void GrpcMemoryAllocatorImpl::Shutdown() {
std::shared_ptr<BasicMemoryQuota> memory_quota;
OrphanablePtr<ReclaimerQueue::Handle>
reclamation_handles[kNumReclamationPasses];
{
MutexLock lock(&memory_quota_mu_);
GPR_ASSERT(!shutdown_);
shutdown_ = true;
memory_quota = memory_quota_;
for (size_t i = 0; i < kNumReclamationPasses; i++) {
reclamation_handles[i] = absl::exchange(reclamation_handles_[i], nullptr);
}
}
}
size_t GrpcMemoryAllocatorImpl::Reserve(MemoryRequest request) {
// Validate request - performed here so we don't bloat the generated code with
// inlined asserts.
GPR_ASSERT(request.min() <= request.max());
GPR_ASSERT(request.max() <= MemoryRequest::max_allowed_size());
while (true) {
// Attempt to reserve memory from our pool.
auto reservation = TryReserve(request);
if (reservation.has_value()) {
return *reservation;
}
// If that failed, grab more from the quota and retry.
Replenish();
}
}
absl::optional<size_t> GrpcMemoryAllocatorImpl::TryReserve(
MemoryRequest request) {
// How much memory should we request? (see the scaling below)
size_t scaled_size_over_min = request.max() - request.min();
// Scale the request down according to memory pressure if we have that
// flexibility.
if (scaled_size_over_min != 0) {
double pressure;
size_t max_recommended_allocation_size;
{
MutexLock lock(&memory_quota_mu_);
const auto pressure_and_max_recommended_allocation_size =
memory_quota_->InstantaneousPressureAndMaxRecommendedAllocationSize();
pressure = pressure_and_max_recommended_allocation_size.first;
max_recommended_allocation_size =
pressure_and_max_recommended_allocation_size.second;
}
// Reduce allocation size proportional to the pressure > 80% usage.
if (pressure > 0.8) {
scaled_size_over_min =
std::min(scaled_size_over_min,
static_cast<size_t>((request.max() - request.min()) *
(1.0 - pressure) / 0.2));
}
if (max_recommended_allocation_size < request.min()) {
scaled_size_over_min = 0;
} else if (request.min() + scaled_size_over_min >
max_recommended_allocation_size) {
scaled_size_over_min = max_recommended_allocation_size - request.min();
}
}
// How much do we want to reserve?
const size_t reserve = request.min() + scaled_size_over_min;
// See how many bytes are available.
size_t available = free_bytes_.load(std::memory_order_acquire);
while (true) {
// Does the current free pool satisfy the request?
if (available < reserve) {
return {};
}
// Try to reserve the requested amount.
// If the amount of free memory changed through this loop, then available
// will be set to the new value and we'll repeat.
if (free_bytes_.compare_exchange_weak(available, available - reserve,
std::memory_order_acq_rel,
std::memory_order_acquire)) {
return reserve;
}
}
}
void GrpcMemoryAllocatorImpl::MaybeDonateBack() {
size_t free = free_bytes_.load(std::memory_order_relaxed);
const size_t kReduceToSize = kMaxQuotaBufferSize / 2;
while (true) {
if (free <= kReduceToSize) return;
size_t ret = free - kReduceToSize;
if (free_bytes_.compare_exchange_weak(free, kReduceToSize,
std::memory_order_acq_rel,
std::memory_order_acquire)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
gpr_log(GPR_INFO, "[%p|%s] Early return %" PRIdPTR " bytes", this,
name_.c_str(), ret);
}
MutexLock lock(&memory_quota_mu_);
GPR_ASSERT(taken_bytes_ >= ret);
taken_bytes_ -= ret;
memory_quota_->Return(ret);
return;
}
}
}
void GrpcMemoryAllocatorImpl::Replenish() {
MutexLock lock(&memory_quota_mu_);
GPR_ASSERT(!shutdown_);
// Attempt a fairly low rate exponential growth request size, bounded between
// some reasonable limits declared at top of file.
auto amount = Clamp(taken_bytes_ / 3, kMinReplenishBytes, kMaxReplenishBytes);
// Take the requested amount from the quota.
memory_quota_->Take(amount);
// Record that we've taken it.
taken_bytes_ += amount;
// Add the taken amount to the free pool.
free_bytes_.fetch_add(amount, std::memory_order_acq_rel);
// See if we can add ourselves as a reclaimer.
MaybeRegisterReclaimerLocked();
}
void GrpcMemoryAllocatorImpl::MaybeRegisterReclaimer() {
MutexLock lock(&memory_quota_mu_);
MaybeRegisterReclaimerLocked();
}
void GrpcMemoryAllocatorImpl::MaybeRegisterReclaimerLocked() {
// If the reclaimer is already registered, then there's nothing to do.
if (registered_reclaimer_) return;
if (shutdown_) return;
// Grab references to the things we'll need
auto self = shared_from_this();
std::weak_ptr<EventEngineMemoryAllocatorImpl> self_weak{self};
registered_reclaimer_ = true;
InsertReclaimer(0, [self_weak](absl::optional<ReclamationSweep> sweep) {
if (!sweep.has_value()) return;
auto self = self_weak.lock();
if (self == nullptr) return;
auto* p = static_cast<GrpcMemoryAllocatorImpl*>(self.get());
MutexLock lock(&p->memory_quota_mu_);
p->registered_reclaimer_ = false;
// Figure out how many bytes we can return to the quota.
size_t return_bytes = p->free_bytes_.exchange(0, std::memory_order_acq_rel);
if (return_bytes == 0) return;
// Subtract that from our outstanding balance.
p->taken_bytes_ -= return_bytes;
// And return them to the quota.
p->memory_quota_->Return(return_bytes);
});
}
void GrpcMemoryAllocatorImpl::Rebind(
std::shared_ptr<BasicMemoryQuota> memory_quota) {
MutexLock lock(&memory_quota_mu_);
GPR_ASSERT(!shutdown_);
if (memory_quota_ == memory_quota) return;
// Return memory to the original memory quota.
memory_quota_->Return(taken_bytes_);
// Reassign any queued reclaimers
for (size_t i = 0; i < kNumReclamationPasses; i++) {
if (reclamation_handles_[i] != nullptr) {
reclamation_handles_[i]->Requeue(memory_quota->reclaimer_queue(i));
}
}
// Switch to the new memory quota, leaving the old one in memory_quota so that
// when we unref it, we are outside of lock.
memory_quota_.swap(memory_quota);
// Drop our freed memory down to zero, to avoid needing to ask the new
// quota for memory we're not currently using.
taken_bytes_ -= free_bytes_.exchange(0, std::memory_order_acq_rel);
// And let the new quota know how much we're already using.
memory_quota_->Take(taken_bytes_);
}
//
// MemoryOwner
//
void MemoryOwner::Rebind(MemoryQuota* quota) {
impl()->Rebind(quota->memory_quota_);
}
//
// BasicMemoryQuota
//
class BasicMemoryQuota::WaitForSweepPromise {
public:
WaitForSweepPromise(std::shared_ptr<BasicMemoryQuota> memory_quota,
uint64_t token)
: memory_quota_(std::move(memory_quota)), token_(token) {}
struct Empty {};
Poll<Empty> operator()() {
if (memory_quota_->reclamation_counter_.load(std::memory_order_relaxed) !=
token_) {
return Empty{};
} else {
return Pending{};
}
}
private:
std::shared_ptr<BasicMemoryQuota> memory_quota_;
uint64_t token_;
};
void BasicMemoryQuota::Start() {
auto self = shared_from_this();
// Reclamation loop:
// basically, wait until we are in overcommit (free_bytes_ < 0), and then:
// while (free_bytes_ < 0) reclaim_memory()
// ... and repeat
auto reclamation_loop = Loop(Seq(
[self]() -> Poll<int> {
// If there's free memory we no longer need to reclaim memory!
if (self->free_bytes_.load(std::memory_order_acquire) > 0) {
return Pending{};
}
return 0;
},
[self]() {
// Race biases to the first thing that completes... so this will
// choose the highest priority/least destructive thing to do that's
// available.
auto annotate = [](const char* name) {
return [name](RefCountedPtr<ReclaimerQueue::Handle> f) {
return std::make_tuple(name, std::move(f));
};
};
return Race(Map(self->reclaimers_[0].Next(), annotate("compact")),
Map(self->reclaimers_[1].Next(), annotate("benign")),
Map(self->reclaimers_[2].Next(), annotate("idle")),
Map(self->reclaimers_[3].Next(), annotate("destructive")));
},
[self](
std::tuple<const char*, RefCountedPtr<ReclaimerQueue::Handle>> arg) {
auto reclaimer = std::move(std::get<1>(arg));
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
double free = std::max(intptr_t(0), self->free_bytes_.load());
size_t quota_size = self->quota_size_.load();
gpr_log(GPR_INFO,
"RQ: %s perform %s reclamation. Available free bytes: %f, "
"total quota_size: %zu",
self->name_.c_str(), std::get<0>(arg), free, quota_size);
}
// One of the reclaimer queues gave us a way to get back memory.
// Call the reclaimer with a token that contains enough to wake us
// up again.
const uint64_t token =
self->reclamation_counter_.fetch_add(1, std::memory_order_relaxed) +
1;
reclaimer->Run(ReclamationSweep(
self, token, Activity::current()->MakeNonOwningWaker()));
// Return a promise that will wait for our barrier. This will be
// awoken by the token above being destroyed. So, once that token is
// destroyed, we'll be able to proceed.
return WaitForSweepPromise(self, token);
},
[]() -> LoopCtl<absl::Status> {
// Continue the loop!
return Continue{};
}));
reclaimer_activity_ =
MakeActivity(std::move(reclamation_loop), ExecCtxWakeupScheduler(),
[](absl::Status status) {
GPR_ASSERT(status.code() == absl::StatusCode::kCancelled);
});
}
void BasicMemoryQuota::Stop() { reclaimer_activity_.reset(); }
void BasicMemoryQuota::SetSize(size_t new_size) {
size_t old_size = quota_size_.exchange(new_size, std::memory_order_relaxed);
if (old_size < new_size) {
// We're growing the quota.
Return(new_size - old_size);
} else {
// We're shrinking the quota.
Take(old_size - new_size);
}
}
void BasicMemoryQuota::Take(size_t amount) {
// If there's a request for nothing, then do nothing!
if (amount == 0) return;
GPR_DEBUG_ASSERT(amount <= std::numeric_limits<intptr_t>::max());
// Grab memory from the quota.
auto prior = free_bytes_.fetch_sub(amount, std::memory_order_acq_rel);
// If we push into overcommit, awake the reclaimer.
if (prior >= 0 && prior < static_cast<intptr_t>(amount)) {
if (reclaimer_activity_ != nullptr) reclaimer_activity_->ForceWakeup();
}
}
void BasicMemoryQuota::FinishReclamation(uint64_t token, Waker waker) {
uint64_t current = reclamation_counter_.load(std::memory_order_relaxed);
if (current != token) return;
if (reclamation_counter_.compare_exchange_strong(current, current + 1,
std::memory_order_relaxed,
std::memory_order_relaxed)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
double free = std::max(intptr_t(0), free_bytes_.load());
size_t quota_size = quota_size_.load();
gpr_log(GPR_INFO,
"RQ: %s reclamation complete. Available free bytes: %f, "
"total quota_size: %zu",
name_.c_str(), free, quota_size);
}
waker.Wakeup();
}
}
void BasicMemoryQuota::Return(size_t amount) {
free_bytes_.fetch_add(amount, std::memory_order_relaxed);
}
std::pair<double, size_t>
BasicMemoryQuota::InstantaneousPressureAndMaxRecommendedAllocationSize() const {
double free = free_bytes_.load();
if (free < 0) free = 0;
size_t quota_size = quota_size_.load();
double size = quota_size;
if (size < 1) return std::make_pair(1.0, 1);
double pressure = (size - free) / size;
if (pressure < 0.0) pressure = 0.0;
if (pressure > 1.0) pressure = 1.0;
return std::make_pair(pressure, quota_size / 16);
}
//
// MemoryQuota
//
MemoryAllocator MemoryQuota::CreateMemoryAllocator(absl::string_view name) {
auto impl = std::make_shared<GrpcMemoryAllocatorImpl>(
memory_quota_, absl::StrCat(memory_quota_->name(), "/allocator/", name));
return MemoryAllocator(std::move(impl));
}
MemoryOwner MemoryQuota::CreateMemoryOwner(absl::string_view name) {
auto impl = std::make_shared<GrpcMemoryAllocatorImpl>(
memory_quota_, absl::StrCat(memory_quota_->name(), "/owner/", name));
return MemoryOwner(std::move(impl));
}
} // namespace grpc_core