blob: 61e9aeae9bcbb7606321c049b97edd4889302382 [file] [log] [blame]
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_PUBLIC_CPP_BINDINGS_LIB_MESSAGE_QUOTA_CHECKER_H_
#define MOJO_PUBLIC_CPP_BINDINGS_LIB_MESSAGE_QUOTA_CHECKER_H_
#include <stdint.h>
#include <memory>
#include "base/component_export.h"
#include "base/memory/ref_counted.h"
#include "base/synchronization/lock.h"
#include "base/thread_annotations.h"
#include "base/time/time.h"
#include "base/types/pass_key.h"
#include "mojo/public/cpp/system/message_pipe.h"
#include "third_party/abseil-cpp/absl/types/optional.h"
namespace mojo {
namespace internal {
// This class keeps track of how many messages are in-flight for a message pipe,
// including messages that are posted or locally queued.
//
// Message pipe owners may have reason to implement their own mechanism for
// queuing outgoing messages before writing them to a pipe. This class helps
// with unread message quota monitoring in such cases, since Mojo's own
// quota monitoring on the pipe cannot account for such external queues.
// Callers are responsible for invoking |BeforeMessagesEnqueued()| and
// |AfterMessagesDequeued()| when making respective changes to their local
// outgoing queue. Additionally, |BeforeWrite()| should be called immediately
// before writing each message to the corresponding message pipe.
//
// Also note that messages posted to a different sequence with base::ThreadPool
// and the like, need to be treated as locally queued. Task queues can grow
// arbitrarily long, and it's ideal to perform unread quota checks before
// posting.
//
// Either |BeforeMessagesEnqueued()| or |BeforeWrite()| may cause the quota
// to be exceeded, thus invoking the |maybe_crash_function| set in this
// object's Configuration.
class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) MessageQuotaChecker
: public base::RefCountedThreadSafe<MessageQuotaChecker> {
public:
// A helper class to maintain a decaying average for the rate of events per
// sampling interval over time.
class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) DecayingRateAverage {
public:
DecayingRateAverage();
// Accrues one event at time |when|. Note that |when| must increase
// monotonically from one event to the next.
void AccrueEvent(base::TimeTicks when);
// Retrieves the current rate average, decayed to |when|.
double GetDecayedRateAverage(base::TimeTicks when) const;
// The length of a sampling interval in seconds.
static constexpr base::TimeDelta kSamplingInterval = base::Seconds(5);
// Returns the start of the sampling interval after the interval that
// |when| falls into.
static base::TimeTicks GetNextSamplingIntervalForTesting(
base::TimeTicks when);
private:
// A new sample is weighed at this rate into the average, whereas the old
// average is weighed at kDecayFactor^age; Note that
// (kSampleWeight + kDecayFactor) == 1.0.
static constexpr double kSampleWeight = 0.5;
static constexpr double kDecayFactor = (1 - kSampleWeight);
// The event count for the current or most recent sampling interval and
// the ordinal sampling interval they correspond to.
size_t events_ = 0;
int64_t events_sampling_interval_;
// The so-far accrued average to |events_sampling_interval_|.
double decayed_average_ = 0.0;
};
// Exposed for use in tests.
struct Configuration;
// Returns a new instance if this invocation has been sampled for quota
// checking.
static scoped_refptr<MessageQuotaChecker> MaybeCreate();
// Public for base::MakeRefCounted(). Use MaybeCreate().
MessageQuotaChecker(const Configuration* config,
base::PassKey<MessageQuotaChecker>);
// Call before writing a message to |message_pipe_|.
void BeforeWrite();
// Call before queueing |num| messages.
void BeforeMessagesEnqueued(size_t num);
// Call after de-queueing |num| messages.
void AfterMessagesDequeued(size_t num);
// Returns the high watermark of quota usage observed by this instance.
size_t GetMaxQuotaUsage();
// Set or unset the message pipe associated with this quota checker.
void SetMessagePipe(MessagePipeHandle message_pipe);
// Test support.
size_t GetCurrentQuotaStatusForTesting();
static Configuration GetConfigurationForTesting();
static scoped_refptr<MessageQuotaChecker> MaybeCreateForTesting(
const Configuration& config);
private:
friend class base::RefCountedThreadSafe<MessageQuotaChecker>;
~MessageQuotaChecker();
static Configuration GetConfiguration();
static scoped_refptr<MessageQuotaChecker> MaybeCreateImpl(
const Configuration& config);
// Returns the amount of unread message quota currently used if there is
// an associated message pipe.
absl::optional<size_t> GetCurrentMessagePipeQuota();
void QuotaCheckImpl(size_t num_enqueued);
const Configuration* config_;
// The time ticks when this instance was created.
const base::TimeTicks creation_time_;
// Cumulative counts for the number of messages enqueued with
// |BeforeMessagesEnqueued()| and dequeued with |BeforeMessagesDequeued()|.
std::atomic<uint64_t> messages_enqueued_{0};
std::atomic<uint64_t> messages_dequeued_{0};
std::atomic<uint64_t> messages_written_{0};
// Guards all state below here.
base::Lock lock_;
// A decaying average of the rate of call to BeforeWrite per second.
DecayingRateAverage write_rate_average_ GUARDED_BY(lock_);
// The locally consumed quota, e.g. the difference between the counts passed
// to |BeforeMessagesEnqueued()| and |BeforeMessagesDequeued()|.
size_t consumed_quota_ GUARDED_BY(lock_) = 0u;
// The high watermark consumed quota observed.
size_t max_consumed_quota_ GUARDED_BY(lock_) = 0u;
// The message pipe this instance observes, if any.
MessagePipeHandle message_pipe_ GUARDED_BY(lock_);
};
struct MessageQuotaChecker::Configuration {
bool is_enabled = false;
size_t sample_rate = 0u;
size_t unread_message_count_quota = 0u;
size_t crash_threshold = 0u;
void (*maybe_crash_function)(size_t quota_used,
absl::optional<size_t> message_pipe_quota_used,
int64_t seconds_since_construction,
double average_write_rate,
uint64_t messages_enqueued,
uint64_t messages_dequeued,
uint64_t messages_written);
};
} // namespace internal
} // namespace mojo
#endif // MOJO_PUBLIC_CPP_BINDINGS_LIB_MESSAGE_QUOTA_CHECKER_H_