blob: c2de7f75cd5c7711649ee2fe0e4d6b1fcad380cb [file] [log] [blame]
// Copyright 2022 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef IPCZ_SRC_IPCZ_SEQUENCED_QUEUE_H_
#define IPCZ_SRC_IPCZ_SEQUENCED_QUEUE_H_
#include <cstddef>
#include <vector>
#include "ipcz/sequence_number.h"
#include "third_party/abseil-cpp/absl/container/inlined_vector.h"
#include "third_party/abseil-cpp/absl/types/optional.h"
namespace ipcz {
template <typename T>
struct DefaultSequencedQueueTraits {
static size_t GetElementSize(const T& element) { return 0; }
};
// SequencedQueue retains a queue of objects strictly ordered by SequenceNumber.
// This class is not thread-safe.
//
// This is useful in situations where queued elements may accumulate slightly
// out-of-order and need to be reordered efficiently for consumption. The
// implementation relies on an assumption that sequence gaps are common but tend
// to be small and short-lived. As such, a SequencedQueue retains at least
// enough linear storage to hold every object between the last popped
// SequenceNumber (exclusive) and the highest queued (or anticipated)
// SequenceNumber so far (inclusive).
//
// Storage may be sparsely populated at times, but as elements are consumed from
// the queue, storage is compacted to reduce waste.
//
// ElementTraits may be overridden to attribute a measurable size to each stored
// element. SequencedQueue performs additional accounting to efficiently track
// the sum of this size across the set of all currently available elements in
// the queue.
template <typename T, typename ElementTraits = DefaultSequencedQueueTraits<T>>
class SequencedQueue {
public:
SequencedQueue() = default;
// Constructs a new SequencedQueue starting at `initial_sequence_number`. The
// queue will not accept elements with a SequenceNumber below this value, and
// this will be the SequenceNumber of the first element to be popped.
explicit SequencedQueue(SequenceNumber initial_sequence_number)
: base_sequence_number_(initial_sequence_number) {}
SequencedQueue(SequencedQueue&& other)
: base_sequence_number_(other.base_sequence_number_),
num_entries_(other.num_entries_),
final_sequence_length_(other.final_sequence_length_) {
if (!other.storage_.empty()) {
size_t entries_offset = other.entries_.data() - storage_.data();
storage_ = std::move(other.storage_);
entries_ =
EntryView(storage_.data() + entries_offset, other.entries_.size());
}
}
SequencedQueue& operator=(SequencedQueue&& other) {
base_sequence_number_ = other.base_sequence_number_;
num_entries_ = other.num_entries_;
final_sequence_length_ = other.final_sequence_length_;
if (!other.storage_.empty()) {
size_t entries_offset = other.entries_.data() - storage_.data();
storage_ = std::move(other.storage_);
entries_ =
EntryView(storage_.data() + entries_offset, other.entries_.size());
} else {
storage_.clear();
entries_ = EntryView(storage_.data(), 0);
}
return *this;
}
~SequencedQueue() = default;
// As a basic practical constraint, SequencedQueue won't tolerate sequence
// gaps larger than this value.
static constexpr uint64_t GetMaxSequenceGap() { return 1000000; }
// The SequenceNumber of the next element that is or will be available from
// the queue. This starts at the constructor's `initial_sequence_number` and
// increments any time an element is successfully popped from the queue or a
// a SequenceNumber is explicitly skipped via SkipNextSequenceNumber().
SequenceNumber current_sequence_number() const {
return base_sequence_number_;
}
// The final length of the sequence that can be popped from this queue. Null
// if a final length has not yet been set. If the final length is N, then the
// last ordered element that can be pushed to or popped from the queue has a
// SequenceNumber of N-1.
const absl::optional<SequenceNumber>& final_sequence_length() const {
return final_sequence_length_;
}
// Returns the number of elements currently ready for popping at the front of
// the queue. This is the number of contiguously sequenced elements
// available starting from `current_sequence_number()`. If the current
// SequenceNumber is 5 and this queue holds elements 5, 6, and 8, then this
// method returns 2: only elements 5 and 6 are available, because element 8
// cannot be made available until element 7 is also available.
size_t GetNumAvailableElements() const {
if (entries_.empty() || !entries_[0].has_value()) {
return 0;
}
return entries_[0]->num_entries_in_span;
}
// Returns the total size of elements currently ready for popping at the
// front of the queue. This is the sum of ElementTraits::GetElementSize() for
// each element counted by `GetNumAvailableElements()`, and it is always
// returned in constant time.
size_t GetTotalAvailableElementSize() const {
if (entries_.empty() || !entries_[0].has_value()) {
return 0;
}
return entries_[0]->total_span_size;
}
// Returns the total length of the contiguous sequence already pushed and/or
// popped from this queue so far. This is essentially
// `current_sequence_number()` plus `GetNumAvailableElements()`. If
// `current_sequence_number()` is 5 and `GetNumAvailableElements()` is 3, then
// elements 5, 6, and 7 are available for retrieval and the current sequence
// length is 8; so this method would return 8.
SequenceNumber GetCurrentSequenceLength() const {
return SequenceNumber{current_sequence_number().value() +
GetNumAvailableElements()};
}
// Sets the final length of this queue's sequence. This is the SequenceNumber
// of the last element that can be pushed, plus 1. If this is set to zero, no
// elements can ever be pushed onto this queue.
//
// May fail and return false if the queue already has pushed and/or popped
// elements with a SequenceNumber greater than or equal to `length`, or if a
// the final sequence length had already been set prior to this call.
bool SetFinalSequenceLength(SequenceNumber length) {
if (final_sequence_length_) {
return false;
}
const SequenceNumber lower_bound(base_sequence_number_.value() +
entries_.size());
if (length < lower_bound) {
return false;
}
if (length.value() - base_sequence_number_.value() > GetMaxSequenceGap()) {
return false;
}
final_sequence_length_ = length;
return Reallocate(length);
}
// Forcibly sets the final length of this queue's sequence to its currently
// available length. This means that if there is a gap in the available
// elements, the queue is cut off just before the gap and all elements beyond
// the gap are destroyed. If the final sequence length had already been set on
// this queue, this overrides that.
void ForceTerminateSequence() {
final_sequence_length_ = GetCurrentSequenceLength();
num_entries_ = GetNumAvailableElements();
if (num_entries_ == 0) {
storage_.clear();
entries_ = {};
return;
}
const size_t entries_offset = entries_.data() - storage_.data();
storage_.resize(entries_offset + num_entries_);
entries_ = EntryView(storage_.data() + entries_offset, num_entries_);
}
// Indicates whether this queue is still expecting to have more elements
// pushed. This is always true if the final sequence length has not been set
// yet.
//
// Once the final sequence length is set, this remains true only until all
// elements between the initial sequence number (inclusive) and the final
// sequence length (exclusive) have been pushed into the queue.
bool ExpectsMoreElements() const {
if (!final_sequence_length_) {
return true;
}
if (base_sequence_number_ >= *final_sequence_length_) {
return false;
}
const size_t num_entries_remaining =
final_sequence_length_->value() - base_sequence_number_.value();
return num_entries_ < num_entries_remaining;
}
// Indicates whether the next element (in sequence order) is available to pop.
bool HasNextElement() const {
return !entries_.empty() && entries_[0].has_value();
}
// Indicates whether this queue's sequence has been fully consumed. This means
// the final sequence length has been set AND all elements up to that length
// have been pushed into and popped from the queue.
bool IsSequenceFullyConsumed() const {
return !HasNextElement() && !ExpectsMoreElements();
}
// Resets this queue to start at the initial SequenceNumber `n`. Must be
// called only on an empty queue and only when the caller can be sure they
// won't want to push any elements with a SequenceNumber below `n`.
void ResetInitialSequenceNumber(SequenceNumber n) {
ABSL_ASSERT(num_entries_ == 0);
base_sequence_number_ = n;
}
// Attempts to skip SequenceNumber `n` in the sequence by advancing the
// current SequenceNumber by one. Returns true on success and false on
// failure.
//
// This can only succeed when `current_sequence_number()` is equal to `n`, no
// entry for SequenceNumber `n` is already in the queue, and the `n` is less
// the final sequence length if applicable. Success is equivalent to pushing
// and immediately popping element `n` except that it does not grow, shrink,
// or otherwise modify the queue's underlying storage.
bool MaybeSkipSequenceNumber(SequenceNumber n) {
if (base_sequence_number_ != n || HasNextElement() ||
(final_sequence_length_ && *final_sequence_length_ <= n)) {
return false;
}
base_sequence_number_ = SequenceNumber{n.value() + 1};
if (num_entries_ != 0) {
entries_.remove_prefix(1);
}
return true;
}
// Pushes an element into the queue with the given SequenceNumber. This may
// fail if `n` falls below the minimum or maximum (when applicable) expected
// sequence number for elements in this queue.
bool Push(SequenceNumber n, T element) {
if (n < base_sequence_number_ ||
(n.value() - base_sequence_number_.value() > GetMaxSequenceGap())) {
return false;
}
// Compute the appropriate index at which to store this new entry, given its
// SequenceNumber and the base SequenceNumber of element 0 in `entries_`.
size_t index = n.value() - base_sequence_number_.value();
if (final_sequence_length_) {
// If `final_sequence_length_` is set, `entries_` must already be sized
// large enough to hold any valid Push().
if (index >= entries_.size() || entries_[index].has_value()) {
// Out of bounds or duplicate entry. Fail.
return false;
}
PlaceNewEntry(index, n, element);
return true;
}
if (index < entries_.size()) {
// `entries_` is already large enough to place this element without
// resizing.
if (entries_[index].has_value()) {
// Duplicate entry. Fail.
return false;
}
PlaceNewEntry(index, n, element);
return true;
}
SequenceNumber new_limit(n.value() + 1);
if (new_limit == SequenceNumber(0)) {
return false;
}
if (!Reallocate(new_limit)) {
return false;
}
PlaceNewEntry(index, n, element);
return true;
}
// Pops the next (in sequence order) element off the queue if available,
// populating `element` with its contents and returning true on success. On
// failure `element` is untouched and this returns false.
bool Pop(T& element) {
if (entries_.empty() || !entries_[0].has_value()) {
return false;
}
Entry& head = *entries_[0];
element = std::move(head.element);
ABSL_ASSERT(num_entries_ > 0);
--num_entries_;
const SequenceNumber sequence_number = base_sequence_number_;
base_sequence_number_ = SequenceNumber{base_sequence_number_.value() + 1};
// Make sure the next queued entry has up-to-date accounting, if present.
if (entries_.size() > 1 && entries_[1]) {
Entry& next = *entries_[1];
next.span_start = head.span_start;
next.span_end = head.span_end;
next.num_entries_in_span = head.num_entries_in_span - 1;
next.total_span_size =
head.total_span_size - ElementTraits::GetElementSize(element);
size_t tail_index = next.span_end.value() - sequence_number.value();
if (tail_index > 1) {
Entry& tail = *entries_[tail_index];
tail.num_entries_in_span = next.num_entries_in_span;
tail.total_span_size = next.total_span_size;
}
}
entries_[0].reset();
entries_ = entries_.subspan(1);
// If there's definitely no more populated element data, take this
// opportunity to realign `entries_` to the front of `storage_` to reduce
// future allocations.
if (num_entries_ == 0) {
entries_ = EntryView(storage_.data(), entries_.size());
}
return true;
}
// Gets a reference to the next element. This reference is NOT stable across
// any non-const methods here.
T& NextElement() {
ABSL_ASSERT(HasNextElement());
return entries_[0]->element;
}
protected:
void ReduceNextElementSize(size_t amount) {
ABSL_ASSERT(HasNextElement());
ABSL_ASSERT(entries_[0]->total_span_size >= amount);
entries_[0]->total_span_size -= amount;
}
private:
bool Reallocate(SequenceNumber sequence_length) {
if (sequence_length < base_sequence_number_) {
return false;
}
uint64_t new_entries_size =
sequence_length.value() - base_sequence_number_.value();
if (new_entries_size > GetMaxSequenceGap()) {
return false;
}
size_t entries_offset = entries_.data() - storage_.data();
if (storage_.size() - entries_offset > new_entries_size) {
// Fast path: just extend the view into storage.
entries_ = EntryView(storage_.data() + entries_offset, new_entries_size);
return true;
}
// We need to reallocate storage. Re-align `entries_` with the front of the
// buffer, and leave some extra room when allocating.
if (entries_offset > 0) {
for (size_t i = 0; i < entries_.size(); ++i) {
storage_[i] = std::move(entries_[i]);
entries_[i].reset();
}
}
storage_.resize(new_entries_size * 2);
entries_ = EntryView(storage_.data(), new_entries_size);
return true;
}
// See detailed comments on Entry below for an explanation of this logic.
void PlaceNewEntry(size_t index, SequenceNumber n, T& element) {
ABSL_ASSERT(index < entries_.size());
ABSL_ASSERT(!entries_[index].has_value());
entries_[index].emplace();
Entry& entry = *entries_[index];
entry.num_entries_in_span = 1;
entry.total_span_size = ElementTraits::GetElementSize(element);
entry.element = std::move(element);
if (index == 0 || !entries_[index - 1]) {
entry.span_start = n;
} else {
Entry& left = *entries_[index - 1];
entry.span_start = left.span_start;
entry.num_entries_in_span += left.num_entries_in_span;
entry.total_span_size += left.total_span_size;
}
if (index == entries_.size() - 1 || !entries_[index + 1]) {
entry.span_end = n;
} else {
Entry& right = *entries_[index + 1];
entry.span_end = right.span_end;
entry.num_entries_in_span += right.num_entries_in_span;
entry.total_span_size += right.total_span_size;
}
Entry* start;
if (entry.span_start <= base_sequence_number_) {
start = &entries_[0].value();
} else {
const size_t start_index =
entry.span_start.value() - base_sequence_number_.value();
start = &entries_[start_index].value();
}
ABSL_ASSERT(entry.span_end >= base_sequence_number_);
const size_t end_index =
entry.span_end.value() - base_sequence_number_.value();
ABSL_ASSERT(end_index < entries_.size());
Entry* end = &entries_[end_index].value();
start->span_end = entry.span_end;
start->num_entries_in_span = entry.num_entries_in_span;
start->total_span_size = entry.total_span_size;
end->span_start = entry.span_start;
end->num_entries_in_span = entry.num_entries_in_span;
end->total_span_size = entry.total_span_size;
++num_entries_;
}
struct Entry {
Entry() = default;
Entry(Entry&& other) = default;
Entry& operator=(Entry&&) = default;
~Entry() = default;
T element;
// NOTE: The fields below are maintained during Push and Pop operations and
// are used to support efficient implementation of GetNumAvailableElements()
// and GetTotalAvailableElementSize(). This warrants some clarification.
//
// Conceptually we treat the active range of entries as a series of
// contiguous spans:
//
// `entries_`: [2][ ][4][5][6][ ][8][9]
//
// For example, above we can designate three contiguous spans: element 2
// stands alone at the front of the queue, elements 4-6 form a second span,
// and then elements 8-9 form the third. Elements 3 and 7 are absent.
//
// We're interested in knowing how many elements (and their total size, as
// designated by ElementTraits) are available right now, which means we want
// to answer the question: how long is the span starting at element 0? In
// this case since element 2 stands alone at the front of the queue, the
// answer is 1. There's 1 element available right now.
//
// If we pop element 2 off the queue, it then becomes:
//
// `entries_`: [ ][4][5][6][ ][8][9]
//
// The head of the queue is pointing at the empty slot for element 3, and
// because no span starts in element 0 there are now 0 elements available to
// pop.
//
// Finally if we then push element 3, the queue looks like this:
//
// `entries_`: [3][4][5][6][ ][8][9]
//
// and now there are 4 elements available to pop. Element 0 begins the span
// of elements 3, 4, 5, and 6.
//
// To answer the question efficiently though, each entry records some
// metadata about the span in which it resides. This information is not kept
// up-to-date for all entries, but we maintain the invariant that the first
// and last element of each distinct span has accurate metadata; and as a
// consequence if any span starts at element 0, then we know element 0's
// metadata accurately answers our general questions about the head of the
// queue.
//
// When an element with sequence number N is inserted into the queue, it can
// be classified in one of four ways:
//
// (1) it stands alone with no element present at N-1 or N+1
// (2) it follows an element at N-1, but N+1 is empty
// (3) it precedes an element at N+1, but N-1 is empty
// (4) it falls between present elements at both N-1 and N+1.
//
// In case (1) we record in the entry that its span starts and ends at
// element N; we also record the length of the span (1) and a traits-defined
// accounting of the element's "size". This entry now has trivially correct
// metadata about its containing span, which consists only of the entry
// itself.
//
// In case (2), element N is now the tail of a pre-existing span. Because
// tail elements are always up-to-date, we simply copy and augment the data
// from the old tail (element N-1) into the new tail (element N). From this
// data we also know where the head of the span is, and the head entry is
// also updated to reflect the same new metadata.
//
// Case (3) is similar to case (2). Element N is now the head of a
// pre-existing span, so we copy and augment the already up-to-date N+1
// entry's metadata (the old head) into our new entry as well as the span's
// tail entry.
//
// Case (4) is joining two pre-existing spans. In this case element N
// fetches the span's start from element N-1 (the tail of the span to the
// left), and the span's end from element N+1 (the head of the span to the
// right); and it sums their element and byte counts with its own. This new
// combined metadata is copied into both the head of the left span and the
// tail of the right span, and with element N populated this now constitutes
// a single combined span with accurate metadata in its head and tail
// entries.
//
// Finally, the only other operation that matters for this accounting is
// Pop(). All Pop() needs to do is derive new metadata for the new
// head-of-queue's span (if present) after popping. This metadata is updated
// in the new head entry as well as its span's tail.
size_t num_entries_in_span = 0;
size_t total_span_size = 0;
SequenceNumber span_start{0};
SequenceNumber span_end{0};
};
using EntryStorage = absl::InlinedVector<absl::optional<Entry>, 4>;
using EntryView = absl::Span<absl::optional<Entry>>;
// This is a sparse vector of queued elements indexed by a relative sequence
// number.
//
// It's sparse because the queue may push elements out of sequence order (e.g.
// elements 42 and 47 may be pushed before elements 43-46.)
EntryStorage storage_;
// A view into `storage_` whose first element corresponds to the entry with
// sequence number `base_sequence_number_`. As elements are popped, the view
// moves forward in `storage_`. When convenient, we may reallocate `storage_`
// and realign this view.
EntryView entries_{storage_.data(), 0};
// The sequence number which corresponds to `entries_` index 0 when `entries_`
// is non-empty.
SequenceNumber base_sequence_number_{0};
// The number of slots in `entries_` which are actually occupied.
size_t num_entries_ = 0;
// The final length of the sequence to be enqueued, if known.
absl::optional<SequenceNumber> final_sequence_length_;
};
} // namespace ipcz
#endif // IPCZ_SRC_IPCZ_SEQUENCED_QUEUE_H_