| // Copyright 2022 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifndef IPCZ_SRC_IPCZ_SEQUENCED_QUEUE_H_ |
| #define IPCZ_SRC_IPCZ_SEQUENCED_QUEUE_H_ |
| |
| #include <cstddef> |
| #include <vector> |
| |
| #include "ipcz/sequence_number.h" |
| #include "third_party/abseil-cpp/absl/container/inlined_vector.h" |
| #include "third_party/abseil-cpp/absl/types/optional.h" |
| |
| namespace ipcz { |
| |
| template <typename T> |
| struct DefaultSequencedQueueTraits { |
| static size_t GetElementSize(const T& element) { return 0; } |
| }; |
| |
| // SequencedQueue retains a queue of objects strictly ordered by SequenceNumber. |
| // This class is not thread-safe. |
| // |
| // This is useful in situations where queued elements may accumulate slightly |
| // out-of-order and need to be reordered efficiently for consumption. The |
| // implementation relies on an assumption that sequence gaps are common but tend |
| // to be small and short-lived. As such, a SequencedQueue retains at least |
| // enough linear storage to hold every object between the last popped |
| // SequenceNumber (exclusive) and the highest queued (or anticipated) |
| // SequenceNumber so far (inclusive). |
| // |
| // Storage may be sparsely populated at times, but as elements are consumed from |
| // the queue, storage is compacted to reduce waste. |
| // |
| // ElementTraits may be overridden to attribute a measurable size to each stored |
| // element. SequencedQueue performs additional accounting to efficiently track |
| // the sum of this size across the set of all currently available elements in |
| // the queue. |
| template <typename T, typename ElementTraits = DefaultSequencedQueueTraits<T>> |
| class SequencedQueue { |
| public: |
| SequencedQueue() = default; |
| |
| // Constructs a new SequencedQueue starting at `initial_sequence_number`. The |
| // queue will not accept elements with a SequenceNumber below this value, and |
| // this will be the SequenceNumber of the first element to be popped. |
| explicit SequencedQueue(SequenceNumber initial_sequence_number) |
| : base_sequence_number_(initial_sequence_number) {} |
| |
| SequencedQueue(SequencedQueue&& other) |
| : base_sequence_number_(other.base_sequence_number_), |
| num_entries_(other.num_entries_), |
| final_sequence_length_(other.final_sequence_length_) { |
| if (!other.storage_.empty()) { |
| size_t entries_offset = other.entries_.data() - storage_.data(); |
| storage_ = std::move(other.storage_); |
| entries_ = |
| EntryView(storage_.data() + entries_offset, other.entries_.size()); |
| } |
| } |
| |
| SequencedQueue& operator=(SequencedQueue&& other) { |
| base_sequence_number_ = other.base_sequence_number_; |
| num_entries_ = other.num_entries_; |
| final_sequence_length_ = other.final_sequence_length_; |
| if (!other.storage_.empty()) { |
| size_t entries_offset = other.entries_.data() - storage_.data(); |
| storage_ = std::move(other.storage_); |
| entries_ = |
| EntryView(storage_.data() + entries_offset, other.entries_.size()); |
| } else { |
| storage_.clear(); |
| entries_ = EntryView(storage_.data(), 0); |
| } |
| return *this; |
| } |
| |
| ~SequencedQueue() = default; |
| |
| // As a basic practical constraint, SequencedQueue won't tolerate sequence |
| // gaps larger than this value. |
| static constexpr uint64_t GetMaxSequenceGap() { return 1000000; } |
| |
| // The SequenceNumber of the next element that is or will be available from |
| // the queue. This starts at the constructor's `initial_sequence_number` and |
| // increments any time an element is successfully popped from the queue or a |
| // a SequenceNumber is explicitly skipped via SkipNextSequenceNumber(). |
| SequenceNumber current_sequence_number() const { |
| return base_sequence_number_; |
| } |
| |
| // The final length of the sequence that can be popped from this queue. Null |
| // if a final length has not yet been set. If the final length is N, then the |
| // last ordered element that can be pushed to or popped from the queue has a |
| // SequenceNumber of N-1. |
| const absl::optional<SequenceNumber>& final_sequence_length() const { |
| return final_sequence_length_; |
| } |
| |
| // Returns the number of elements currently ready for popping at the front of |
| // the queue. This is the number of contiguously sequenced elements |
| // available starting from `current_sequence_number()`. If the current |
| // SequenceNumber is 5 and this queue holds elements 5, 6, and 8, then this |
| // method returns 2: only elements 5 and 6 are available, because element 8 |
| // cannot be made available until element 7 is also available. |
| size_t GetNumAvailableElements() const { |
| if (entries_.empty() || !entries_[0].has_value()) { |
| return 0; |
| } |
| |
| return entries_[0]->num_entries_in_span; |
| } |
| |
| // Returns the total size of elements currently ready for popping at the |
| // front of the queue. This is the sum of ElementTraits::GetElementSize() for |
| // each element counted by `GetNumAvailableElements()`, and it is always |
| // returned in constant time. |
| size_t GetTotalAvailableElementSize() const { |
| if (entries_.empty() || !entries_[0].has_value()) { |
| return 0; |
| } |
| |
| return entries_[0]->total_span_size; |
| } |
| |
| // Returns the total length of the contiguous sequence already pushed and/or |
| // popped from this queue so far. This is essentially |
| // `current_sequence_number()` plus `GetNumAvailableElements()`. If |
| // `current_sequence_number()` is 5 and `GetNumAvailableElements()` is 3, then |
| // elements 5, 6, and 7 are available for retrieval and the current sequence |
| // length is 8; so this method would return 8. |
| SequenceNumber GetCurrentSequenceLength() const { |
| return SequenceNumber{current_sequence_number().value() + |
| GetNumAvailableElements()}; |
| } |
| |
| // Sets the final length of this queue's sequence. This is the SequenceNumber |
| // of the last element that can be pushed, plus 1. If this is set to zero, no |
| // elements can ever be pushed onto this queue. |
| // |
| // May fail and return false if the queue already has pushed and/or popped |
| // elements with a SequenceNumber greater than or equal to `length`, or if a |
| // the final sequence length had already been set prior to this call. |
| bool SetFinalSequenceLength(SequenceNumber length) { |
| if (final_sequence_length_) { |
| return false; |
| } |
| |
| const SequenceNumber lower_bound(base_sequence_number_.value() + |
| entries_.size()); |
| if (length < lower_bound) { |
| return false; |
| } |
| |
| if (length.value() - base_sequence_number_.value() > GetMaxSequenceGap()) { |
| return false; |
| } |
| |
| final_sequence_length_ = length; |
| return Reallocate(length); |
| } |
| |
| // Forcibly sets the final length of this queue's sequence to its currently |
| // available length. This means that if there is a gap in the available |
| // elements, the queue is cut off just before the gap and all elements beyond |
| // the gap are destroyed. If the final sequence length had already been set on |
| // this queue, this overrides that. |
| void ForceTerminateSequence() { |
| final_sequence_length_ = GetCurrentSequenceLength(); |
| num_entries_ = GetNumAvailableElements(); |
| if (num_entries_ == 0) { |
| storage_.clear(); |
| entries_ = {}; |
| return; |
| } |
| |
| const size_t entries_offset = entries_.data() - storage_.data(); |
| storage_.resize(entries_offset + num_entries_); |
| entries_ = EntryView(storage_.data() + entries_offset, num_entries_); |
| } |
| |
| // Indicates whether this queue is still expecting to have more elements |
| // pushed. This is always true if the final sequence length has not been set |
| // yet. |
| // |
| // Once the final sequence length is set, this remains true only until all |
| // elements between the initial sequence number (inclusive) and the final |
| // sequence length (exclusive) have been pushed into the queue. |
| bool ExpectsMoreElements() const { |
| if (!final_sequence_length_) { |
| return true; |
| } |
| |
| if (base_sequence_number_ >= *final_sequence_length_) { |
| return false; |
| } |
| |
| const size_t num_entries_remaining = |
| final_sequence_length_->value() - base_sequence_number_.value(); |
| return num_entries_ < num_entries_remaining; |
| } |
| |
| // Indicates whether the next element (in sequence order) is available to pop. |
| bool HasNextElement() const { |
| return !entries_.empty() && entries_[0].has_value(); |
| } |
| |
| // Indicates whether this queue's sequence has been fully consumed. This means |
| // the final sequence length has been set AND all elements up to that length |
| // have been pushed into and popped from the queue. |
| bool IsSequenceFullyConsumed() const { |
| return !HasNextElement() && !ExpectsMoreElements(); |
| } |
| |
| // Resets this queue to start at the initial SequenceNumber `n`. Must be |
| // called only on an empty queue and only when the caller can be sure they |
| // won't want to push any elements with a SequenceNumber below `n`. |
| void ResetInitialSequenceNumber(SequenceNumber n) { |
| ABSL_ASSERT(num_entries_ == 0); |
| base_sequence_number_ = n; |
| } |
| |
| // Attempts to skip SequenceNumber `n` in the sequence by advancing the |
| // current SequenceNumber by one. Returns true on success and false on |
| // failure. |
| // |
| // This can only succeed when `current_sequence_number()` is equal to `n`, no |
| // entry for SequenceNumber `n` is already in the queue, and the `n` is less |
| // the final sequence length if applicable. Success is equivalent to pushing |
| // and immediately popping element `n` except that it does not grow, shrink, |
| // or otherwise modify the queue's underlying storage. |
| bool MaybeSkipSequenceNumber(SequenceNumber n) { |
| if (base_sequence_number_ != n || HasNextElement() || |
| (final_sequence_length_ && *final_sequence_length_ <= n)) { |
| return false; |
| } |
| |
| base_sequence_number_ = SequenceNumber{n.value() + 1}; |
| if (num_entries_ != 0) { |
| entries_.remove_prefix(1); |
| } |
| return true; |
| } |
| |
| // Pushes an element into the queue with the given SequenceNumber. This may |
| // fail if `n` falls below the minimum or maximum (when applicable) expected |
| // sequence number for elements in this queue. |
| bool Push(SequenceNumber n, T element) { |
| if (n < base_sequence_number_ || |
| (n.value() - base_sequence_number_.value() > GetMaxSequenceGap())) { |
| return false; |
| } |
| |
| // Compute the appropriate index at which to store this new entry, given its |
| // SequenceNumber and the base SequenceNumber of element 0 in `entries_`. |
| size_t index = n.value() - base_sequence_number_.value(); |
| if (final_sequence_length_) { |
| // If `final_sequence_length_` is set, `entries_` must already be sized |
| // large enough to hold any valid Push(). |
| if (index >= entries_.size() || entries_[index].has_value()) { |
| // Out of bounds or duplicate entry. Fail. |
| return false; |
| } |
| PlaceNewEntry(index, n, element); |
| return true; |
| } |
| |
| if (index < entries_.size()) { |
| // `entries_` is already large enough to place this element without |
| // resizing. |
| if (entries_[index].has_value()) { |
| // Duplicate entry. Fail. |
| return false; |
| } |
| PlaceNewEntry(index, n, element); |
| return true; |
| } |
| |
| SequenceNumber new_limit(n.value() + 1); |
| if (new_limit == SequenceNumber(0)) { |
| return false; |
| } |
| |
| if (!Reallocate(new_limit)) { |
| return false; |
| } |
| |
| PlaceNewEntry(index, n, element); |
| return true; |
| } |
| |
| // Pops the next (in sequence order) element off the queue if available, |
| // populating `element` with its contents and returning true on success. On |
| // failure `element` is untouched and this returns false. |
| bool Pop(T& element) { |
| if (entries_.empty() || !entries_[0].has_value()) { |
| return false; |
| } |
| |
| Entry& head = *entries_[0]; |
| element = std::move(head.element); |
| |
| ABSL_ASSERT(num_entries_ > 0); |
| --num_entries_; |
| const SequenceNumber sequence_number = base_sequence_number_; |
| base_sequence_number_ = SequenceNumber{base_sequence_number_.value() + 1}; |
| |
| // Make sure the next queued entry has up-to-date accounting, if present. |
| if (entries_.size() > 1 && entries_[1]) { |
| Entry& next = *entries_[1]; |
| next.span_start = head.span_start; |
| next.span_end = head.span_end; |
| next.num_entries_in_span = head.num_entries_in_span - 1; |
| next.total_span_size = |
| head.total_span_size - ElementTraits::GetElementSize(element); |
| |
| size_t tail_index = next.span_end.value() - sequence_number.value(); |
| if (tail_index > 1) { |
| Entry& tail = *entries_[tail_index]; |
| tail.num_entries_in_span = next.num_entries_in_span; |
| tail.total_span_size = next.total_span_size; |
| } |
| } |
| |
| entries_[0].reset(); |
| entries_ = entries_.subspan(1); |
| |
| // If there's definitely no more populated element data, take this |
| // opportunity to realign `entries_` to the front of `storage_` to reduce |
| // future allocations. |
| if (num_entries_ == 0) { |
| entries_ = EntryView(storage_.data(), entries_.size()); |
| } |
| |
| return true; |
| } |
| |
| // Gets a reference to the next element. This reference is NOT stable across |
| // any non-const methods here. |
| T& NextElement() { |
| ABSL_ASSERT(HasNextElement()); |
| return entries_[0]->element; |
| } |
| |
| protected: |
| void ReduceNextElementSize(size_t amount) { |
| ABSL_ASSERT(HasNextElement()); |
| ABSL_ASSERT(entries_[0]->total_span_size >= amount); |
| entries_[0]->total_span_size -= amount; |
| } |
| |
| private: |
| bool Reallocate(SequenceNumber sequence_length) { |
| if (sequence_length < base_sequence_number_) { |
| return false; |
| } |
| |
| uint64_t new_entries_size = |
| sequence_length.value() - base_sequence_number_.value(); |
| if (new_entries_size > GetMaxSequenceGap()) { |
| return false; |
| } |
| |
| size_t entries_offset = entries_.data() - storage_.data(); |
| if (storage_.size() - entries_offset > new_entries_size) { |
| // Fast path: just extend the view into storage. |
| entries_ = EntryView(storage_.data() + entries_offset, new_entries_size); |
| return true; |
| } |
| |
| // We need to reallocate storage. Re-align `entries_` with the front of the |
| // buffer, and leave some extra room when allocating. |
| if (entries_offset > 0) { |
| for (size_t i = 0; i < entries_.size(); ++i) { |
| storage_[i] = std::move(entries_[i]); |
| entries_[i].reset(); |
| } |
| } |
| |
| storage_.resize(new_entries_size * 2); |
| entries_ = EntryView(storage_.data(), new_entries_size); |
| return true; |
| } |
| |
| // See detailed comments on Entry below for an explanation of this logic. |
| void PlaceNewEntry(size_t index, SequenceNumber n, T& element) { |
| ABSL_ASSERT(index < entries_.size()); |
| ABSL_ASSERT(!entries_[index].has_value()); |
| |
| entries_[index].emplace(); |
| Entry& entry = *entries_[index]; |
| entry.num_entries_in_span = 1; |
| entry.total_span_size = ElementTraits::GetElementSize(element); |
| |
| entry.element = std::move(element); |
| |
| if (index == 0 || !entries_[index - 1]) { |
| entry.span_start = n; |
| } else { |
| Entry& left = *entries_[index - 1]; |
| entry.span_start = left.span_start; |
| entry.num_entries_in_span += left.num_entries_in_span; |
| entry.total_span_size += left.total_span_size; |
| } |
| |
| if (index == entries_.size() - 1 || !entries_[index + 1]) { |
| entry.span_end = n; |
| } else { |
| Entry& right = *entries_[index + 1]; |
| entry.span_end = right.span_end; |
| entry.num_entries_in_span += right.num_entries_in_span; |
| entry.total_span_size += right.total_span_size; |
| } |
| |
| Entry* start; |
| if (entry.span_start <= base_sequence_number_) { |
| start = &entries_[0].value(); |
| } else { |
| const size_t start_index = |
| entry.span_start.value() - base_sequence_number_.value(); |
| start = &entries_[start_index].value(); |
| } |
| |
| ABSL_ASSERT(entry.span_end >= base_sequence_number_); |
| const size_t end_index = |
| entry.span_end.value() - base_sequence_number_.value(); |
| ABSL_ASSERT(end_index < entries_.size()); |
| Entry* end = &entries_[end_index].value(); |
| |
| start->span_end = entry.span_end; |
| start->num_entries_in_span = entry.num_entries_in_span; |
| start->total_span_size = entry.total_span_size; |
| |
| end->span_start = entry.span_start; |
| end->num_entries_in_span = entry.num_entries_in_span; |
| end->total_span_size = entry.total_span_size; |
| |
| ++num_entries_; |
| } |
| |
| struct Entry { |
| Entry() = default; |
| Entry(Entry&& other) = default; |
| Entry& operator=(Entry&&) = default; |
| ~Entry() = default; |
| |
| T element; |
| |
| // NOTE: The fields below are maintained during Push and Pop operations and |
| // are used to support efficient implementation of GetNumAvailableElements() |
| // and GetTotalAvailableElementSize(). This warrants some clarification. |
| // |
| // Conceptually we treat the active range of entries as a series of |
| // contiguous spans: |
| // |
| // `entries_`: [2][ ][4][5][6][ ][8][9] |
| // |
| // For example, above we can designate three contiguous spans: element 2 |
| // stands alone at the front of the queue, elements 4-6 form a second span, |
| // and then elements 8-9 form the third. Elements 3 and 7 are absent. |
| // |
| // We're interested in knowing how many elements (and their total size, as |
| // designated by ElementTraits) are available right now, which means we want |
| // to answer the question: how long is the span starting at element 0? In |
| // this case since element 2 stands alone at the front of the queue, the |
| // answer is 1. There's 1 element available right now. |
| // |
| // If we pop element 2 off the queue, it then becomes: |
| // |
| // `entries_`: [ ][4][5][6][ ][8][9] |
| // |
| // The head of the queue is pointing at the empty slot for element 3, and |
| // because no span starts in element 0 there are now 0 elements available to |
| // pop. |
| // |
| // Finally if we then push element 3, the queue looks like this: |
| // |
| // `entries_`: [3][4][5][6][ ][8][9] |
| // |
| // and now there are 4 elements available to pop. Element 0 begins the span |
| // of elements 3, 4, 5, and 6. |
| // |
| // To answer the question efficiently though, each entry records some |
| // metadata about the span in which it resides. This information is not kept |
| // up-to-date for all entries, but we maintain the invariant that the first |
| // and last element of each distinct span has accurate metadata; and as a |
| // consequence if any span starts at element 0, then we know element 0's |
| // metadata accurately answers our general questions about the head of the |
| // queue. |
| // |
| // When an element with sequence number N is inserted into the queue, it can |
| // be classified in one of four ways: |
| // |
| // (1) it stands alone with no element present at N-1 or N+1 |
| // (2) it follows an element at N-1, but N+1 is empty |
| // (3) it precedes an element at N+1, but N-1 is empty |
| // (4) it falls between present elements at both N-1 and N+1. |
| // |
| // In case (1) we record in the entry that its span starts and ends at |
| // element N; we also record the length of the span (1) and a traits-defined |
| // accounting of the element's "size". This entry now has trivially correct |
| // metadata about its containing span, which consists only of the entry |
| // itself. |
| // |
| // In case (2), element N is now the tail of a pre-existing span. Because |
| // tail elements are always up-to-date, we simply copy and augment the data |
| // from the old tail (element N-1) into the new tail (element N). From this |
| // data we also know where the head of the span is, and the head entry is |
| // also updated to reflect the same new metadata. |
| // |
| // Case (3) is similar to case (2). Element N is now the head of a |
| // pre-existing span, so we copy and augment the already up-to-date N+1 |
| // entry's metadata (the old head) into our new entry as well as the span's |
| // tail entry. |
| // |
| // Case (4) is joining two pre-existing spans. In this case element N |
| // fetches the span's start from element N-1 (the tail of the span to the |
| // left), and the span's end from element N+1 (the head of the span to the |
| // right); and it sums their element and byte counts with its own. This new |
| // combined metadata is copied into both the head of the left span and the |
| // tail of the right span, and with element N populated this now constitutes |
| // a single combined span with accurate metadata in its head and tail |
| // entries. |
| // |
| // Finally, the only other operation that matters for this accounting is |
| // Pop(). All Pop() needs to do is derive new metadata for the new |
| // head-of-queue's span (if present) after popping. This metadata is updated |
| // in the new head entry as well as its span's tail. |
| size_t num_entries_in_span = 0; |
| size_t total_span_size = 0; |
| SequenceNumber span_start{0}; |
| SequenceNumber span_end{0}; |
| }; |
| |
| using EntryStorage = absl::InlinedVector<absl::optional<Entry>, 4>; |
| using EntryView = absl::Span<absl::optional<Entry>>; |
| |
| // This is a sparse vector of queued elements indexed by a relative sequence |
| // number. |
| // |
| // It's sparse because the queue may push elements out of sequence order (e.g. |
| // elements 42 and 47 may be pushed before elements 43-46.) |
| EntryStorage storage_; |
| |
| // A view into `storage_` whose first element corresponds to the entry with |
| // sequence number `base_sequence_number_`. As elements are popped, the view |
| // moves forward in `storage_`. When convenient, we may reallocate `storage_` |
| // and realign this view. |
| EntryView entries_{storage_.data(), 0}; |
| |
| // The sequence number which corresponds to `entries_` index 0 when `entries_` |
| // is non-empty. |
| SequenceNumber base_sequence_number_{0}; |
| |
| // The number of slots in `entries_` which are actually occupied. |
| size_t num_entries_ = 0; |
| |
| // The final length of the sequence to be enqueued, if known. |
| absl::optional<SequenceNumber> final_sequence_length_; |
| }; |
| |
| } // namespace ipcz |
| |
| #endif // IPCZ_SRC_IPCZ_SEQUENCED_QUEUE_H_ |