Merge "Automated commit: libchrome r1248627 uprev" into main
diff --git a/BASE_VER b/BASE_VER
index 9aaf412..2c0bdc2 100644
--- a/BASE_VER
+++ b/BASE_VER
@@ -1 +1 @@
-1248016
+1248627
diff --git a/base/allocator/partition_alloc_support.cc b/base/allocator/partition_alloc_support.cc
index 921d4bb..05fa310 100644
--- a/base/allocator/partition_alloc_support.cc
+++ b/base/allocator/partition_alloc_support.cc
@@ -4,6 +4,7 @@
#include "base/allocator/partition_alloc_support.h"
+#include <base/ranges/algorithm.h>
#include <array>
#include <cinttypes>
#include <cstdint>
@@ -398,36 +399,30 @@
stacktrace, "\r\n", KEEP_WHITESPACE, SPLIT_WANT_NONEMPTY);
// We are looking for the callers of the function releasing the raw_ptr and
- // freeing memory:
- const StringPiece callees[] = {
- // Common signatures
- "internal::PartitionFree",
- "base::(anonymous namespace)::FreeFn",
+ // freeing memory. This lists potential matching patterns. A pattern is a list
+ // of substrings that are all required to match.
+ const std::vector<StringPiece> callee_patterns[] = {
+ // Common signature patters:
+ {"internal::PartitionFree"},
+ {"base::", "::FreeFn"},
+ {"internal::RawPtrBackupRefImpl", "::ReleaseInternal"},
- // Linux signatures
- "internal::RawPtrBackupRefImpl<>::ReleaseInternal()",
- "base::RefCountedThreadSafe<>::Release()",
+ // Linux specific:
+ {"base::RefCountedThreadSafe<>::Release"},
- // Windows signatures
- "internal::RawPtrBackupRefImpl<0,0>::ReleaseInternal",
- "internal::RawPtrBackupRefImpl<0,1>::ReleaseInternal",
- "_free_base",
-
- // Mac signatures
- "internal::RawPtrBackupRefImpl<false, false>::ReleaseInternal",
- "internal::RawPtrBackupRefImpl<false, true>::ReleaseInternal",
-
- // ChromeOS signatures
- "base::allocator::dispatcher::internal::DispatcherImpl<>::FreeFn()",
+ // Windows specific:
+ {"_free_base"},
// Task traces are prefixed with "Task trace:" in
// |TaskTrace::OutputToStream|
- "Task trace:",
+ {"Task trace:"},
};
size_t caller_index = 0;
for (size_t i = 0; i < lines.size(); ++i) {
- for (const auto& callee : callees) {
- if (lines[i].find(callee) != StringPiece::npos) {
+ for (const auto& patterns : callee_patterns) {
+ if (ranges::all_of(patterns, [&](const StringPiece& pattern) {
+ return lines[i].find(pattern) != StringPiece::npos;
+ })) {
caller_index = i + 1;
}
}
diff --git a/base/allocator/partition_allocator/src/partition_alloc/pointers/instance_tracer.cc b/base/allocator/partition_allocator/src/partition_alloc/pointers/instance_tracer.cc
index 5cc91d8..9b18b557 100644
--- a/base/allocator/partition_allocator/src/partition_alloc/pointers/instance_tracer.cc
+++ b/base/allocator/partition_allocator/src/partition_alloc/pointers/instance_tracer.cc
@@ -12,7 +12,6 @@
#include "partition_alloc/partition_alloc_base/check.h"
#include "partition_alloc/partition_alloc_base/debug/stack_trace.h"
#include "partition_alloc/partition_alloc_base/no_destructor.h"
-#include "partition_alloc/partition_ref_count.h"
#include "partition_alloc/partition_root.h"
namespace base::internal {
@@ -55,7 +54,7 @@
const std::pair<uintptr_t, size_t> slot_and_size =
partition_alloc::PartitionAllocGetSlotStartAndSizeInBRPPool(address);
const uintptr_t slot_count = reinterpret_cast<uintptr_t>(
- partition_alloc::internal::PartitionRefCountPointer(
+ partition_alloc::PartitionRoot::RefCountPointerFromSlotStartAndSize(
slot_and_size.first, slot_and_size.second));
const std::lock_guard guard(GetStorageMutex());
@@ -86,7 +85,7 @@
partition_alloc::PartitionAllocGetSlotStartAndSizeInBRPPool(
reinterpret_cast<uintptr_t>(address));
const uintptr_t slot_count = reinterpret_cast<uintptr_t>(
- partition_alloc::internal::PartitionRefCountPointer(
+ partition_alloc::PartitionRoot::RefCountPointerFromSlotStartAndSize(
slot_and_size.first, slot_and_size.second));
return GetStackTracesForDanglingRefs(slot_count);
}
diff --git a/base/android/requires_api.h b/base/android/requires_api.h
new file mode 100644
index 0000000..35cd93b
--- /dev/null
+++ b/base/android/requires_api.h
@@ -0,0 +1,38 @@
+// Copyright 2024 The Chromium Authors
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BASE_ANDROID_REQUIRES_API_H_
+#define BASE_ANDROID_REQUIRES_API_H_
+
+// Sets the API version of a symbol. Analogous to how @RequiresApi sets the API
+// version of Java symbols.
+//
+// A compiler warning (-Wunguarded-availability) is emitted when symbols with
+// this annotation are reference by code that has a lower API version.
+//
+// The default API version is set by the default_min_sdk_version GN arg.
+//
+// To reference a symbol from code with a lower api level, you must use:
+// if (__builtin_available(android API_VERSION, *)) { ... }
+//
+// See also:
+// https://android.googlesource.com/platform/ndk/+/master/docs/BuildSystemMaintainers.md#weak-symbols-for-api-definitions
+#define REQUIRES_ANDROID_API(x) \
+ __attribute__((__availability__(android, introduced = x)))
+
+// Sets the default API version for all symbols.
+//
+// Usage:
+// #pragma clang attribute push DEFAULT_REQUIRES_ANDROID_API(29)
+// ...
+// #pragma clang attribute pop
+//
+// For use only within .cc files so that declarations within header files are
+// clearly labeled.
+#define DEFAULT_REQUIRES_ANDROID_API(x) \
+ (REQUIRES_ANDROID_API(x), \
+ apply_to = any(enum, enum_constant, field, function, namespace, record, \
+ type_alias, variable))
+
+#endif // BASE_ANDROID_REQUIRES_API_H_
diff --git a/base/containers/heap_array.h b/base/containers/heap_array.h
index 59b084b..9bec60d 100644
--- a/base/containers/heap_array.h
+++ b/base/containers/heap_array.h
@@ -59,6 +59,12 @@
return HeapArray(std::unique_ptr<T[]>(new T[size]), size);
}
+ static HeapArray CopiedFrom(base::span<const T> that) {
+ auto result = HeapArray::Uninit(that.size());
+ result.copy_from(that);
+ return result;
+ }
+
// Constructs an empty array and does not allocate any memory.
HeapArray()
requires(std::constructible_from<T>)
diff --git a/base/containers/heap_array_unittest.cc b/base/containers/heap_array_unittest.cc
index 57afc35..6e3b824 100644
--- a/base/containers/heap_array_unittest.cc
+++ b/base/containers/heap_array_unittest.cc
@@ -202,6 +202,18 @@
#endif
}
+TEST(HeapArray, CopiedFrom) {
+ base::span<uint32_t> empty_span;
+ auto empty_vec = base::HeapArray<uint32_t>::CopiedFrom(empty_span);
+ EXPECT_EQ(0u, empty_vec.size());
+
+ const uint32_t kData[] = {1000u, 1001u};
+ auto vec = base::HeapArray<uint32_t>::CopiedFrom(kData);
+ ASSERT_EQ(2u, vec.size());
+ EXPECT_EQ(1000u, vec[0]);
+ EXPECT_EQ(1001u, vec[1]);
+}
+
TEST(HeapArray, RunsDestructor) {
size_t count = 0;
{
diff --git a/base/files/file_proxy.cc b/base/files/file_proxy.cc
index 92bd952..b4430e0 100644
--- a/base/files/file_proxy.cc
+++ b/base/files/file_proxy.cc
@@ -7,12 +7,15 @@
#include <memory>
#include <utility>
+#include "base/containers/heap_array.h"
#include "base/files/file.h"
#include "base/files/file_util.h"
#include "base/functional/bind.h"
#include "base/functional/callback_helpers.h"
#include "base/location.h"
+#include "base/numerics/safe_conversions.h"
#include "base/task/task_runner.h"
+#include "third_party/abseil-cpp/absl/types/optional.h"
namespace {
@@ -170,25 +173,37 @@
public:
ReadHelper(base::WeakPtr<FileProxy> proxy, File file, int bytes_to_read)
: FileHelper(std::move(proxy), std::move(file)),
- buffer_(new char[static_cast<size_t>(bytes_to_read)]),
- bytes_to_read_(bytes_to_read) {}
+ // SAFETY - References to `buffer_` are provided as a span only after
+ // successfully reading some bytes.
+ buffer_(base::HeapArray<uint8_t>::Uninit(
+ static_cast<size_t>(bytes_to_read))) {}
+
ReadHelper(const ReadHelper&) = delete;
ReadHelper& operator=(const ReadHelper&) = delete;
void RunWork(int64_t offset) {
- bytes_read_ = file_.Read(offset, buffer_.get(), bytes_to_read_);
- error_ = (bytes_read_ < 0) ? File::FILE_ERROR_FAILED : File::FILE_OK;
+ absl::optional<size_t> result = file_.Read(offset, buffer_);
+ if (!result.has_value()) {
+ bytes_read_ = -1;
+ error_ = File::FILE_ERROR_FAILED;
+ return;
+ }
+ bytes_read_ = checked_cast<int>(result.value());
+ error_ = File::FILE_OK;
}
void Reply(FileProxy::ReadCallback callback) {
PassFile();
DCHECK(!callback.is_null());
- std::move(callback).Run(error_, buffer_.get(), bytes_read_);
+ base::span<uint8_t> read_span;
+ if (error_ == File::FILE_OK) {
+ read_span = buffer_.first(checked_cast<size_t>(bytes_read_));
+ }
+ std::move(callback).Run(error_, base::as_chars(read_span));
}
private:
- std::unique_ptr<char[]> buffer_;
- int bytes_to_read_;
+ base::HeapArray<uint8_t> buffer_;
int bytes_read_ = 0;
};
@@ -196,19 +211,22 @@
public:
WriteHelper(base::WeakPtr<FileProxy> proxy,
File file,
- const char* buffer,
- int bytes_to_write)
+ base::span<const uint8_t> data)
: FileHelper(std::move(proxy), std::move(file)),
- buffer_(new char[static_cast<size_t>(bytes_to_write)]),
- bytes_to_write_(bytes_to_write) {
- memcpy(buffer_.get(), buffer, static_cast<size_t>(bytes_to_write));
- }
+ buffer_(base::HeapArray<uint8_t>::CopiedFrom(data)) {}
+
WriteHelper(const WriteHelper&) = delete;
WriteHelper& operator=(const WriteHelper&) = delete;
void RunWork(int64_t offset) {
- bytes_written_ = file_.Write(offset, buffer_.get(), bytes_to_write_);
- error_ = (bytes_written_ < 0) ? File::FILE_ERROR_FAILED : File::FILE_OK;
+ absl::optional<size_t> result = file_.Write(offset, buffer_);
+ if (!result.has_value()) {
+ bytes_written_ = -1;
+ error_ = File::FILE_ERROR_FAILED;
+ return;
+ }
+ bytes_written_ = checked_cast<int>(result.value());
+ error_ = File::FILE_OK;
}
void Reply(FileProxy::WriteCallback callback) {
@@ -218,15 +236,13 @@
}
private:
- std::unique_ptr<char[]> buffer_;
- int bytes_to_write_;
+ base::HeapArray<uint8_t> buffer_;
int bytes_written_ = 0;
};
} // namespace
-FileProxy::FileProxy(TaskRunner* task_runner) : task_runner_(task_runner) {
-}
+FileProxy::FileProxy(TaskRunner* task_runner) : task_runner_(task_runner) {}
FileProxy::~FileProxy() {
if (file_.IsValid())
@@ -311,15 +327,15 @@
}
bool FileProxy::Write(int64_t offset,
- const char* buffer,
- int bytes_to_write,
+ base::span<const uint8_t> data,
WriteCallback callback) {
DCHECK(file_.IsValid());
- if (bytes_to_write <= 0 || buffer == nullptr)
+ if (data.empty()) {
return false;
+ }
+ WriteHelper* helper =
+ new WriteHelper(weak_ptr_factory_.GetWeakPtr(), std::move(file_), data);
- WriteHelper* helper = new WriteHelper(
- weak_ptr_factory_.GetWeakPtr(), std::move(file_), buffer, bytes_to_write);
return task_runner_->PostTaskAndReply(
FROM_HERE, BindOnce(&WriteHelper::RunWork, Unretained(helper), offset),
BindOnce(&WriteHelper::Reply, Owned(helper), std::move(callback)));
diff --git a/base/files/file_proxy.h b/base/files/file_proxy.h
index 1dae4a6..40ff76b 100644
--- a/base/files/file_proxy.h
+++ b/base/files/file_proxy.h
@@ -8,6 +8,7 @@
#include <stdint.h>
#include "base/base_export.h"
+#include "base/containers/span.h"
#include "base/files/file.h"
#include "base/files/file_path.h"
#include "base/functional/callback_forward.h"
@@ -44,7 +45,7 @@
using GetFileInfoCallback =
OnceCallback<void(File::Error, const File::Info&)>;
using ReadCallback =
- OnceCallback<void(File::Error, const char* data, int bytes_read)>;
+ OnceCallback<void(File::Error, base::span<const char> data)>;
using WriteCallback = OnceCallback<void(File::Error, int bytes_written)>;
explicit FileProxy(TaskRunner* task_runner);
@@ -109,8 +110,7 @@
// This returns false if |bytes_to_write| is less than or equal to zero,
// if |buffer| is NULL, or if task posting to |task_runner| has failed.
bool Write(int64_t offset,
- const char* buffer,
- int bytes_to_write,
+ base::span<const uint8_t> data,
WriteCallback callback);
// Proxies File::SetTimes. The callback can be null.
diff --git a/base/files/file_proxy_unittest.cc b/base/files/file_proxy_unittest.cc
index f50a210..df1ca30 100644
--- a/base/files/file_proxy_unittest.cc
+++ b/base/files/file_proxy_unittest.cc
@@ -7,8 +7,10 @@
#include <stddef.h>
#include <stdint.h>
+#include <string_view>
#include <utility>
+#include "base/containers/heap_array.h"
#include "base/files/file.h"
#include "base/files/file_util.h"
#include "base/files/scoped_temp_dir.h"
@@ -65,11 +67,9 @@
void DidRead(base::RepeatingClosure continuation,
File::Error error,
- const char* data,
- int bytes_read) {
+ base::span<const char> data) {
error_ = error;
- buffer_.resize(bytes_read);
- memcpy(&buffer_[0], data, bytes_read);
+ buffer_ = base::HeapArray<char>::CopiedFrom(data);
continuation.Run();
}
@@ -105,7 +105,7 @@
File::Error error_;
FilePath path_;
File::Info file_info_;
- std::vector<char> buffer_;
+ base::HeapArray<char> buffer_;
int bytes_written_;
WeakPtrFactory<FileProxyTest> weak_factory_{this};
};
@@ -215,7 +215,7 @@
// The file should be writable.
{
RunLoop run_loop;
- proxy.Write(0, "test", 4,
+ proxy.Write(0, base::as_byte_span(std::string_view("test")),
BindOnce(&FileProxyTest::DidWrite, weak_factory_.GetWeakPtr(),
run_loop.QuitWhenIdleClosure()));
run_loop.Run();
@@ -321,17 +321,17 @@
FileProxy proxy(file_task_runner());
CreateProxy(File::FLAG_CREATE | File::FLAG_WRITE, &proxy);
- const char data[] = "foo!";
- size_t data_bytes = std::size(data);
+ auto write_span = base::as_byte_span("foo!");
+ EXPECT_EQ(write_span.size(), 5u); // Includes the NUL, too.
{
RunLoop run_loop;
- proxy.Write(0, data, data_bytes,
+ proxy.Write(0, write_span,
BindOnce(&FileProxyTest::DidWrite, weak_factory_.GetWeakPtr(),
run_loop.QuitWhenIdleClosure()));
run_loop.Run();
}
EXPECT_EQ(File::FILE_OK, error_);
- EXPECT_EQ(static_cast<int>(data_bytes), bytes_written_);
+ EXPECT_EQ(write_span.size(), static_cast<size_t>(bytes_written_));
// Flush the written data. (So that the following read should always
// succeed. On some platforms it may work with or without this flush.)
@@ -344,11 +344,11 @@
EXPECT_EQ(File::FILE_OK, error_);
// Verify the written data.
- char buffer[10];
- EXPECT_EQ(data_bytes,
- base::ReadFile(TestPath(), make_span(buffer, data_bytes)));
- for (size_t i = 0; i < data_bytes; ++i) {
- EXPECT_EQ(data[i], buffer[i]);
+ char read_buffer[10];
+ EXPECT_GE(std::size(read_buffer), write_span.size());
+ EXPECT_EQ(write_span.size(), base::ReadFile(TestPath(), read_buffer));
+ for (size_t i = 0; i < write_span.size(); ++i) {
+ EXPECT_EQ(write_span[i], read_buffer[i]);
}
}
diff --git a/base/functional/concurrent_callbacks.h b/base/functional/concurrent_callbacks.h
new file mode 100644
index 0000000..649911e
--- /dev/null
+++ b/base/functional/concurrent_callbacks.h
@@ -0,0 +1,127 @@
+// Copyright 2024 The Chromium Authors
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BASE_FUNCTIONAL_CONCURRENT_CALLBACKS_H_
+#define BASE_FUNCTIONAL_CONCURRENT_CALLBACKS_H_
+
+#include <memory>
+#include <type_traits>
+#include <vector>
+
+#include "base/functional/bind.h"
+#include "base/functional/callback.h"
+#include "base/location.h"
+#include "base/memory/raw_ptr.h"
+#include "base/task/bind_post_task.h"
+#include "base/task/sequenced_task_runner.h"
+
+// OVERVIEW:
+//
+// ConcurrentCallbacks<T> is an alternative to BarrierCallback<T>, it dispenses
+// OnceCallbacks via CreateCallback() and invokes the callback passed to Done()
+// after all prior callbacks have been run.
+//
+// ConcurrentCallbacks<T> is intended to be used over BarrierCallback<T> in
+// cases where the count is unknown prior to requiring a callback to start a
+// task, and for cases where the count is manually derived from the code and
+// subject to human error.
+//
+// IMPORTANT NOTES:
+//
+// - ConcurrentCallbacks<T> is NOT thread safe.
+// - The done callback will NOT be run synchronously, it will be PostTask() to
+// the sequence that Done() was invoked on.
+// - ConcurrentCallbacks<T> cannot be used after Done() is called, a CHECK
+// verifies this.
+//
+// TYPICAL USAGE:
+//
+// class Example {
+// void OnRequestsReceived(std::vector<Request> requests) {
+// base::ConcurrentCallbacks<Result> concurrent;
+//
+// for (Request& request : requests) {
+// if (IsValidRequest(request)) {
+// StartRequest(std::move(request), concurrent.CreateCallback());
+// }
+// }
+//
+// std::move(concurrent).Done(
+// base::BindOnce(&Example::OnRequestsComplete, GetWeakPtr()));
+// }
+//
+// void StartRequest(Request request,
+// base::OnceCallback<void(Result)> callback) {
+// // Process the request asynchronously and call callback with a Result.
+// }
+//
+// void OnRequestsComplete(std::vector<Result> results) {
+// // Invoked after all requests are completed and receives the results of
+// // all of them.
+// }
+// };
+
+namespace base {
+
+template <typename T>
+class ConcurrentCallbacks {
+ public:
+ using Results = std::vector<std::remove_cvref_t<T>>;
+
+ ConcurrentCallbacks() {
+ auto info_owner = std::make_unique<Info>();
+ info_ = info_owner.get();
+ info_run_callback_ = BindRepeating(&Info::Run, std::move(info_owner));
+ }
+
+ // Create a callback for the done callback to wait for.
+ [[nodiscard]] OnceCallback<void(T)> CreateCallback() {
+ CHECK(info_);
+ ++info_->pending_;
+ return info_run_callback_;
+ }
+
+ // Finish creating concurrent callbacks and provide done callback to run once
+ // all prior callbacks have executed.
+ // `this` is no longer usable after calling Done(), must be called with
+ // std::move().
+ void Done(OnceCallback<void(Results)> done_callback,
+ const Location& location = FROM_HERE) && {
+ CHECK(info_);
+ info_->done_callback_ =
+ BindPostTask(SequencedTaskRunner::GetCurrentDefault(),
+ std::move(done_callback), location);
+ if (info_->pending_ == 0u) {
+ std::move(info_->done_callback_).Run(std::move(info_->results_));
+ }
+ info_ = nullptr;
+ }
+
+ private:
+ class Info {
+ public:
+ Info() = default;
+
+ void Run(T value) {
+ CHECK_GT(pending_, 0u);
+ --pending_;
+ results_.push_back(std::move(value));
+ if (done_callback_ && pending_ == 0u) {
+ std::move(done_callback_).Run(std::move(results_));
+ }
+ }
+
+ size_t pending_ = 0u;
+ Results results_;
+ OnceCallback<void(Results)> done_callback_;
+ };
+
+ RepeatingCallback<void(T)> info_run_callback_;
+ // info_ is owned by info_run_callback_.
+ raw_ptr<Info> info_;
+};
+
+} // namespace base
+
+#endif // BASE_FUNCTIONAL_CONCURRENT_CALLBACKS_H_
diff --git a/base/functional/concurrent_callbacks_unittest.cc b/base/functional/concurrent_callbacks_unittest.cc
new file mode 100644
index 0000000..224e89e
--- /dev/null
+++ b/base/functional/concurrent_callbacks_unittest.cc
@@ -0,0 +1,133 @@
+// Copyright 2024 The Chromium Authors
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/functional/concurrent_callbacks.h"
+
+#include <vector>
+
+#include "base/functional/callback.h"
+#include "base/test/gtest_util.h"
+#include "base/test/task_environment.h"
+#include "base/test/test_future.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace base {
+
+namespace {
+
+TEST(ConcurrentCallbacksTest, Empty) {
+ test::SingleThreadTaskEnvironment task_environment;
+
+ ConcurrentCallbacks<size_t> concurrent;
+
+ test::TestFuture<std::vector<size_t>> future;
+ std::move(concurrent).Done(future.GetCallback());
+
+ std::vector<size_t> values = future.Take();
+ EXPECT_TRUE(values.empty());
+}
+
+TEST(ConcurrentCallbacksTest, RunBeforeDone) {
+ test::SingleThreadTaskEnvironment task_environment;
+
+ ConcurrentCallbacks<size_t> concurrent;
+
+ for (size_t i = 0; i < 10; ++i) {
+ concurrent.CreateCallback().Run(i);
+ }
+
+ test::TestFuture<std::vector<size_t>> future;
+ std::move(concurrent).Done(future.GetCallback());
+
+ EXPECT_FALSE(future.IsReady());
+
+ std::vector<size_t> values = future.Take();
+ EXPECT_EQ(values.size(), 10u);
+ for (size_t i = 0; i < values.size(); ++i) {
+ EXPECT_EQ(values[i], i);
+ }
+}
+
+TEST(ConcurrentCallbacksTest, RunAfterDone) {
+ test::SingleThreadTaskEnvironment task_environment;
+
+ ConcurrentCallbacks<size_t> concurrent;
+
+ std::vector<base::OnceCallback<void(size_t)>> callbacks;
+ for (size_t i = 0; i < 10; ++i) {
+ callbacks.push_back(concurrent.CreateCallback());
+ }
+
+ test::TestFuture<std::vector<size_t>> future;
+ std::move(concurrent).Done(future.GetCallback());
+
+ for (size_t i = 0; i < callbacks.size(); ++i) {
+ std::move(callbacks[i]).Run(i);
+ }
+
+ EXPECT_FALSE(future.IsReady());
+
+ std::vector<size_t> values = future.Take();
+ EXPECT_EQ(values.size(), 10u);
+ for (size_t i = 0; i < values.size(); ++i) {
+ EXPECT_EQ(values[i], i);
+ }
+}
+
+TEST(ConcurrentCallbacksTest, CallbacksOutliveObject) {
+ test::SingleThreadTaskEnvironment task_environment;
+
+ std::vector<base::OnceCallback<void(size_t)>> callbacks;
+ test::TestFuture<std::vector<size_t>> future;
+
+ {
+ ConcurrentCallbacks<size_t> concurrent;
+ for (size_t i = 0; i < 10; ++i) {
+ callbacks.push_back(concurrent.CreateCallback());
+ }
+ std::move(concurrent).Done(future.GetCallback());
+ }
+
+ for (size_t i = 0; i < callbacks.size(); ++i) {
+ std::move(callbacks[i]).Run(i);
+ }
+
+ EXPECT_FALSE(future.IsReady());
+
+ std::vector<size_t> values = future.Take();
+ EXPECT_EQ(values.size(), 10u);
+ for (size_t i = 0; i < values.size(); ++i) {
+ EXPECT_EQ(values[i], i);
+ }
+}
+
+TEST(ConcurrentCallbacksTest, CallbackAcceptsConstRef) {
+ test::SingleThreadTaskEnvironment task_environment;
+
+ ConcurrentCallbacks<const size_t&> concurrent;
+
+ std::vector<base::OnceCallback<void(const size_t&)>> callbacks;
+ for (size_t i = 0; i < 10; ++i) {
+ callbacks.push_back(concurrent.CreateCallback());
+ }
+
+ test::TestFuture<std::vector<size_t>> future;
+ std::move(concurrent).Done(future.GetCallback());
+
+ for (size_t i = 0; i < callbacks.size(); ++i) {
+ std::move(callbacks[i]).Run(i);
+ }
+
+ EXPECT_FALSE(future.IsReady());
+
+ std::vector<size_t> values = future.Take();
+ EXPECT_EQ(values.size(), 10u);
+ for (size_t i = 0; i < values.size(); ++i) {
+ EXPECT_EQ(values[i], i);
+ }
+}
+
+} // namespace
+
+} // namespace base
diff --git a/base/functional/concurrent_closures.cc b/base/functional/concurrent_closures.cc
new file mode 100644
index 0000000..d662d59
--- /dev/null
+++ b/base/functional/concurrent_closures.cc
@@ -0,0 +1,49 @@
+// Copyright 2024 The Chromium Authors
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/functional/concurrent_closures.h"
+
+#include "base/functional/bind.h"
+#include "base/task/sequenced_task_runner.h"
+
+namespace base {
+
+ConcurrentClosures::ConcurrentClosures() {
+ auto info_owner = std::make_unique<Info>();
+ info_ = info_owner.get();
+ info_run_closure_ = BindRepeating(&Info::Run, std::move(info_owner));
+}
+ConcurrentClosures::~ConcurrentClosures() = default;
+
+OnceClosure ConcurrentClosures::CreateClosure() {
+ CHECK(info_);
+ ++info_->pending_;
+ return info_run_closure_;
+}
+
+void ConcurrentClosures::Done(OnceClosure done_closure,
+ const Location& location) && {
+ CHECK(info_);
+ info_->done_closure_ = BindPostTask(SequencedTaskRunner::GetCurrentDefault(),
+ std::move(done_closure), location);
+ if (info_->pending_ == 0u) {
+ std::move(info_->done_closure_).Run();
+ }
+ info_ = nullptr;
+}
+
+ConcurrentClosures::Info::Info() = default;
+
+ConcurrentClosures::Info::~Info() = default;
+
+void ConcurrentClosures::Info::Run() {
+ CHECK_GT(pending_, 0u);
+ --pending_;
+ if (done_closure_ && pending_ == 0u) {
+ SequencedTaskRunner::GetCurrentDefault()->PostTask(
+ FROM_HERE, std::move(done_closure_));
+ }
+}
+
+} // namespace base
diff --git a/base/functional/concurrent_closures.h b/base/functional/concurrent_closures.h
new file mode 100644
index 0000000..0d85a5e
--- /dev/null
+++ b/base/functional/concurrent_closures.h
@@ -0,0 +1,81 @@
+// Copyright 2024 The Chromium Authors
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BASE_FUNCTIONAL_CONCURRENT_CLOSURES_H_
+#define BASE_FUNCTIONAL_CONCURRENT_CLOSURES_H_
+
+#include "base/base_export.h"
+#include "base/functional/callback.h"
+#include "base/location.h"
+#include "base/memory/raw_ptr.h"
+#include "base/task/bind_post_task.h"
+
+namespace base {
+
+// OVERVIEW:
+//
+// ConcurrentClosures is a OnceClosure version of ConcurrentCallbacks<T> and an
+// alternative to BarrierClosure, it dispenses OnceClosures via CreateClosure()
+// and invokes the closure passed to Done() after all prior closures have been
+// run.
+//
+// ConcurrentClosures is intended to be used over BarrierClosure in
+// cases where the count is unknown prior to requiring a closure to start a
+// task, and for cases where the count is manually derived from the code and
+// subject to human error.
+//
+// IMPORTANT NOTES:
+//
+// - ConcurrentClosures is NOT thread safe.
+// - The done closure will NOT be run synchronously, it will be PostTask() to
+// the sequence that Done() was invoked on.
+// - ConcurrentClosures cannot be used after Done() is called, a CHECK verifies
+// this.
+//
+// TYPICAL USAGE:
+//
+// void DoABC(OnceClosure closure) {
+// base::ConcurrentClosures concurrent;
+//
+// DoA(concurrent.CreateClosure());
+// DoB(concurrent.CreateClosure());
+// DoC(concurrent.CreateClosure());
+//
+// std::move(concurrent).Done(closure);
+// }
+
+class BASE_EXPORT ConcurrentClosures {
+ public:
+ ConcurrentClosures();
+ ~ConcurrentClosures();
+
+ // Create a closure for the done closure to wait for.
+ [[nodiscard]] OnceClosure CreateClosure();
+
+ // Finish creating concurrent closures and provide done closure to run once
+ // all prior closures have executed.
+ // `this` is no longer usable after calling Done(), must be called with
+ // std::move().
+ void Done(OnceClosure done_closure, const Location& location = FROM_HERE) &&;
+
+ private:
+ class Info {
+ public:
+ Info();
+ ~Info();
+
+ void Run();
+
+ size_t pending_ = 0u;
+ OnceClosure done_closure_;
+ };
+
+ RepeatingClosure info_run_closure_;
+ // info_ is owned by info_run_closure_.
+ raw_ptr<Info> info_;
+};
+
+} // namespace base
+
+#endif // BASE_FUNCTIONAL_CONCURRENT_CLOSURES_H_
diff --git a/base/functional/concurrent_closures_unittest.cc b/base/functional/concurrent_closures_unittest.cc
new file mode 100644
index 0000000..a79ae26
--- /dev/null
+++ b/base/functional/concurrent_closures_unittest.cc
@@ -0,0 +1,96 @@
+// Copyright 2024 The Chromium Authors
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/functional/concurrent_closures.h"
+
+#include <vector>
+
+#include "base/functional/callback.h"
+#include "base/test/gtest_util.h"
+#include "base/test/task_environment.h"
+#include "base/test/test_future.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace base {
+
+namespace {
+
+TEST(ConcurrentClosuresTest, Empty) {
+ test::SingleThreadTaskEnvironment task_environment;
+
+ ConcurrentClosures concurrent;
+
+ test::TestFuture<void> future;
+ std::move(concurrent).Done(future.GetCallback());
+
+ EXPECT_FALSE(future.IsReady());
+
+ EXPECT_TRUE(future.Wait());
+}
+
+TEST(ConcurrentClosuresTest, RunBeforeDone) {
+ test::SingleThreadTaskEnvironment task_environment;
+
+ ConcurrentClosures concurrent;
+
+ for (size_t i = 0; i < 10; ++i) {
+ concurrent.CreateClosure().Run();
+ }
+
+ test::TestFuture<void> future;
+ std::move(concurrent).Done(future.GetCallback());
+
+ EXPECT_FALSE(future.IsReady());
+
+ EXPECT_TRUE(future.Wait());
+}
+
+TEST(ConcurrentClosuresTest, RunAfterDone) {
+ test::SingleThreadTaskEnvironment task_environment;
+
+ ConcurrentClosures concurrent;
+
+ std::vector<base::OnceClosure> closures;
+ for (size_t i = 0; i < 10; ++i) {
+ closures.push_back(concurrent.CreateClosure());
+ }
+
+ test::TestFuture<void> future;
+ std::move(concurrent).Done(future.GetCallback());
+
+ for (base::OnceClosure& callback : closures) {
+ std::move(callback).Run();
+ }
+
+ EXPECT_FALSE(future.IsReady());
+
+ EXPECT_TRUE(future.Wait());
+}
+
+TEST(ConcurrentClosuresTest, CallbacksOutliveObject) {
+ test::SingleThreadTaskEnvironment task_environment;
+
+ std::vector<base::OnceClosure> closures;
+ test::TestFuture<void> future;
+
+ {
+ ConcurrentClosures concurrent;
+ for (size_t i = 0; i < 10; ++i) {
+ closures.push_back(concurrent.CreateClosure());
+ }
+ std::move(concurrent).Done(future.GetCallback());
+ }
+
+ for (base::OnceClosure& callback : closures) {
+ std::move(callback).Run();
+ }
+
+ EXPECT_FALSE(future.IsReady());
+
+ EXPECT_TRUE(future.Wait());
+}
+
+} // namespace
+
+} // namespace base
diff --git a/base/sequence_checker_impl.cc b/base/sequence_checker_impl.cc
index 990cf2a..b25e7af 100644
--- a/base/sequence_checker_impl.cc
+++ b/base/sequence_checker_impl.cc
@@ -57,6 +57,7 @@
TS_UNCHECKED_READ(bound_at_) = std::move(TS_UNCHECKED_READ(other.bound_at_));
TS_UNCHECKED_READ(sequence_token_) = TS_UNCHECKED_READ(other.sequence_token_);
+ TS_UNCHECKED_READ(thread_ref_) = TS_UNCHECKED_READ(other.thread_ref_);
// `other.bound_at_` was moved from so it's null.
TS_UNCHECKED_READ(other.sequence_token_) = internal::SequenceToken();
@@ -71,6 +72,8 @@
// If we're detached, bind to current state.
EnsureAssigned();
+ CHECK(!thread_ref_.is_null());
+
// Return true if called from the bound sequence.
if (sequence_token_ == internal::SequenceToken::GetForCurrentThread()) {
return true;
diff --git a/base/sequence_checker_unittest.cc b/base/sequence_checker_unittest.cc
index 0f0484b..4ee9b13 100644
--- a/base/sequence_checker_unittest.cc
+++ b/base/sequence_checker_unittest.cc
@@ -12,6 +12,7 @@
#include "base/functional/bind.h"
#include "base/functional/callback_helpers.h"
+#include "base/sequence_checker_impl.h"
#include "base/sequence_token.h"
#include "base/task/single_thread_task_runner.h"
#include "base/task/thread_pool.h"
@@ -283,26 +284,38 @@
// in ~SequenceCheckerOwner.
class SequenceCheckerOwner {
public:
- SequenceCheckerOwner() = default;
+ explicit SequenceCheckerOwner(SequenceCheckerImpl* other_checker)
+ : other_checker_(other_checker) {}
SequenceCheckerOwner(const SequenceCheckerOwner&) = delete;
SequenceCheckerOwner& operator=(const SequenceCheckerOwner&) = delete;
- ~SequenceCheckerOwner() { EXPECT_TRUE(checker_.CalledOnValidSequence()); }
+ ~SequenceCheckerOwner() {
+ // Check passes on TLS destruction.
+ EXPECT_TRUE(checker_.CalledOnValidSequence());
+
+ // Check also passes on TLS destruction after move assignment.
+ *other_checker_ = std::move(checker_);
+ EXPECT_TRUE(other_checker_->CalledOnValidSequence());
+ }
private:
SequenceCheckerImpl checker_;
+ raw_ptr<SequenceCheckerImpl> other_checker_;
};
// Verifies SequenceCheckerImpl::CalledOnValidSequence() returns true if called
// during thread destruction.
TEST(SequenceCheckerTest, FromThreadDestruction) {
SequenceChecker::EnableStackLogging();
+
+ SequenceCheckerImpl other_checker;
ThreadLocalOwnedPointer<SequenceCheckerOwner> thread_local_owner;
{
test::TaskEnvironment task_environment;
auto task_runner = ThreadPool::CreateSequencedTaskRunner({});
task_runner->PostTask(
FROM_HERE, BindLambdaForTesting([&]() {
- thread_local_owner.Set(std::make_unique<SequenceCheckerOwner>());
+ thread_local_owner.Set(
+ std::make_unique<SequenceCheckerOwner>(&other_checker));
}));
task_runner = nullptr;
task_environment.RunUntilIdle();
diff --git a/base/sync_socket.h b/base/sync_socket.h
index 0d72883..a0a9c50 100644
--- a/base/sync_socket.h
+++ b/base/sync_socket.h
@@ -12,6 +12,7 @@
#include <stddef.h>
#include "base/base_export.h"
+#include "base/containers/span.h"
#include "base/files/platform_file.h"
#include "base/synchronization/waitable_event.h"
#include "base/time/time.h"
@@ -50,20 +51,30 @@
// Sends the message to the remote peer of the SyncSocket.
// Note it is not safe to send messages from the same socket handle by
// multiple threads simultaneously.
- // buffer is a pointer to the data to send.
- // length is the length of the data to send (must be non-zero).
+ // `data` must be non-empty.
// Returns the number of bytes sent, or 0 upon failure.
+ virtual size_t Send(span<const uint8_t> data);
+ // Same as above, but with the following parameters:
+ // `buffer` is a pointer to the data to send.
+ // `length` is the length of the data to send (must be non-zero).
+ // TODO(https://crbug.com/1490484): Migrate callers to the span version.
virtual size_t Send(const void* buffer, size_t length);
// Receives a message from an SyncSocket.
- // buffer is a pointer to the buffer to receive data.
- // length is the number of bytes of data to receive (must be non-zero).
+ // The data will be received in `buffer`, which must be non-empty.
// Returns the number of bytes received, or 0 upon failure.
+ virtual size_t Receive(span<uint8_t> buffer);
+ // Same as above, but with the following parameters:
+ // `buffer` is a pointer to the buffer to receive data.
+ // `length` is the number of bytes of data to receive (must be non-zero).
+ // TODO(https://crbug.com/1490484): Migrate callers to the span version.
virtual size_t Receive(void* buffer, size_t length);
- // Same as Receive() but only blocks for data until |timeout| has elapsed or
- // |buffer| |length| is exhausted. Currently only timeouts less than one
- // second are allowed. Return the amount of data read.
+ // Same as Receive() but only blocks for data until `timeout` has elapsed or
+ // `buffer` is exhausted. Currently only timeouts less than one second are
+ // allowed. Returns the number of bytes read.
+ virtual size_t ReceiveWithTimeout(span<uint8_t> buffer, TimeDelta timeout);
+ // TODO(https://crbug.com/1490484): Migrate callers to the span version.
virtual size_t ReceiveWithTimeout(void* buffer,
size_t length,
TimeDelta timeout);
@@ -116,17 +127,23 @@
// supported on <Vista. So, for Windows only, we override these
// SyncSocket methods in order to support shutting down the 'socket'.
void Close() override;
+ size_t Receive(span<uint8_t> buffer) override;
+ // TODO(https://crbug.com/1490484): Migrate callers to the span version.
size_t Receive(void* buffer, size_t length) override;
+ size_t ReceiveWithTimeout(span<uint8_t> buffer, TimeDelta timeout) override;
+ // TODO(https://crbug.com/1490484): Migrate callers to the span version.
size_t ReceiveWithTimeout(void* buffer,
size_t length,
TimeDelta timeout) override;
#endif
// Send() is overridden to catch cases where the remote end is not responding
- // and we fill the local socket buffer. When the buffer is full, this
+ // and we fill the local socket buffer. When `data` is full, this
// implementation of Send() will not block indefinitely as
// SyncSocket::Send will, but instead return 0, as no bytes could be sent.
// Note that the socket will not be closed in this case.
+ size_t Send(span<const uint8_t> data) override;
+ // TODO(https://crbug.com/1490484): Migrate callers to the span version.
size_t Send(const void* buffer, size_t length) override;
private:
diff --git a/base/sync_socket_posix.cc b/base/sync_socket_posix.cc
index 6520f0a..0a8d673 100644
--- a/base/sync_socket_posix.cc
+++ b/base/sync_socket_posix.cc
@@ -34,16 +34,10 @@
// Writes |length| of |buffer| into |handle|. Returns the number of bytes
// written or zero on error. |length| must be greater than 0.
-size_t SendHelper(SyncSocket::Handle handle,
- const void* buffer,
- size_t length) {
- DCHECK_GT(length, 0u);
- DCHECK_LE(length, kMaxMessageLength);
+size_t SendHelper(SyncSocket::Handle handle, span<const uint8_t> data) {
+ CHECK_LE(data.size(), kMaxMessageLength);
DCHECK_NE(handle, SyncSocket::kInvalidHandle);
- return WriteFileDescriptor(
- handle, make_span(static_cast<const uint8_t*>(buffer), length))
- ? length
- : 0;
+ return WriteFileDescriptor(handle, data) ? data.size() : 0;
}
} // namespace
@@ -91,19 +85,21 @@
handle_.reset();
}
-size_t SyncSocket::Send(const void* buffer, size_t length) {
+size_t SyncSocket::Send(span<const uint8_t> data) {
ScopedBlockingCall scoped_blocking_call(FROM_HERE, BlockingType::MAY_BLOCK);
- return SendHelper(handle(), buffer, length);
+ return SendHelper(handle(), data);
}
-size_t SyncSocket::Receive(void* buffer, size_t length) {
+size_t SyncSocket::Send(const void* buffer, size_t length) {
+ return Send(make_span(static_cast<const uint8_t*>(buffer), length));
+}
+
+size_t SyncSocket::Receive(span<uint8_t> buffer) {
ScopedBlockingCall scoped_blocking_call(FROM_HERE, BlockingType::MAY_BLOCK);
- DCHECK_GT(length, 0u);
- DCHECK_LE(length, kMaxMessageLength);
+ CHECK_LE(buffer.size(), kMaxMessageLength);
DCHECK(IsValid());
- char* charbuffer = static_cast<char*>(buffer);
- if (ReadFromFD(handle(), make_span(charbuffer, length))) {
- return length;
+ if (ReadFromFD(handle(), as_writable_chars(buffer))) {
+ return buffer.size();
}
return 0;
}
@@ -111,9 +107,17 @@
size_t SyncSocket::ReceiveWithTimeout(void* buffer,
size_t length,
TimeDelta timeout) {
+ return ReceiveWithTimeout(make_span(static_cast<uint8_t*>(buffer), length),
+ std::move(timeout));
+}
+
+size_t SyncSocket::Receive(void* buffer, size_t length) {
+ return Receive(make_span(static_cast<uint8_t*>(buffer), length));
+}
+
+size_t SyncSocket::ReceiveWithTimeout(span<uint8_t> buffer, TimeDelta timeout) {
ScopedBlockingCall scoped_blocking_call(FROM_HERE, BlockingType::MAY_BLOCK);
- DCHECK_GT(length, 0u);
- DCHECK_LE(length, kMaxMessageLength);
+ CHECK_LE(buffer.size(), kMaxMessageLength);
DCHECK(IsValid());
// Only timeouts greater than zero and less than one second are allowed.
@@ -130,7 +134,7 @@
pollfd.revents = 0;
size_t bytes_read_total = 0;
- while (bytes_read_total < length) {
+ while (!buffer.empty()) {
const TimeDelta this_timeout = finish_time - TimeTicks::Now();
const int timeout_ms =
static_cast<int>(this_timeout.InMillisecondsRoundedUp());
@@ -151,15 +155,15 @@
// No special handling is needed for error (POLLERR); we can let any of the
// following operations fail and handle it there.
DCHECK(pollfd.revents & (POLLIN | POLLHUP | POLLERR)) << pollfd.revents;
- const size_t bytes_to_read = std::min(Peek(), length - bytes_read_total);
+ const size_t bytes_to_read = std::min(Peek(), buffer.size());
// There may be zero bytes to read if the socket at the other end closed.
if (!bytes_to_read)
return bytes_read_total;
- const size_t bytes_received =
- Receive(static_cast<char*>(buffer) + bytes_read_total, bytes_to_read);
+ const size_t bytes_received = Receive(buffer.subspan(0u, bytes_to_read));
bytes_read_total += bytes_received;
+ buffer = buffer.subspan(bytes_received);
if (bytes_received != bytes_to_read)
return bytes_read_total;
}
@@ -194,9 +198,8 @@
return HANDLE_EINTR(shutdown(handle(), SHUT_RDWR)) >= 0;
}
-size_t CancelableSyncSocket::Send(const void* buffer, size_t length) {
- DCHECK_GT(length, 0u);
- DCHECK_LE(length, kMaxMessageLength);
+size_t CancelableSyncSocket::Send(span<const uint8_t> data) {
+ CHECK_LE(data.size(), kMaxMessageLength);
DCHECK(IsValid());
const int flags = fcntl(handle(), F_GETFL);
@@ -206,7 +209,7 @@
fcntl(handle(), F_SETFL, flags | O_NONBLOCK);
}
- const size_t len = SendHelper(handle(), buffer, length);
+ const size_t len = SendHelper(handle(), data);
if (flags != -1 && (flags & O_NONBLOCK) == 0) {
// Restore the original flags.
@@ -216,6 +219,10 @@
return len;
}
+size_t CancelableSyncSocket::Send(const void* buffer, size_t length) {
+ return Send(make_span(static_cast<const uint8_t*>(buffer), length));
+}
+
// static
bool CancelableSyncSocket::CreatePair(CancelableSyncSocket* socket_a,
CancelableSyncSocket* socket_b) {
diff --git a/base/sync_socket_unittest.cc b/base/sync_socket_unittest.cc
index f407f26..6e6ead4 100644
--- a/base/sync_socket_unittest.cc
+++ b/base/sync_socket_unittest.cc
@@ -78,9 +78,11 @@
// Verify |socket_a| can send to |socket_a| and |socket_a| can Receive from
// |socket_a|.
- ASSERT_EQ(sizeof(kSending), socket_a->Send(&kSending, sizeof(kSending)));
+ ASSERT_EQ(sizeof(kSending),
+ socket_a->Send(as_bytes(make_span(&kSending, 1u))));
ASSERT_EQ(sizeof(kSending), socket_b->Peek());
- ASSERT_EQ(sizeof(kSending), socket_b->Receive(&received, sizeof(kSending)));
+ ASSERT_EQ(sizeof(kSending),
+ socket_b->Receive(as_writable_bytes(make_span(&received, 1u))));
ASSERT_EQ(kSending, received);
ASSERT_EQ(0u, socket_a->Peek());
@@ -88,9 +90,11 @@
// Now verify the reverse.
received = 0;
- ASSERT_EQ(sizeof(kSending), socket_b->Send(&kSending, sizeof(kSending)));
+ ASSERT_EQ(sizeof(kSending),
+ socket_b->Send(as_bytes(make_span(&kSending, 1u))));
ASSERT_EQ(sizeof(kSending), socket_a->Peek());
- ASSERT_EQ(sizeof(kSending), socket_a->Receive(&received, sizeof(kSending)));
+ ASSERT_EQ(sizeof(kSending),
+ socket_a->Receive(as_writable_bytes(make_span(&received, 1u))));
ASSERT_EQ(kSending, received);
ASSERT_EQ(0u, socket_a->Peek());
@@ -173,7 +177,7 @@
TEST_F(CancelableSyncSocketTest, ReceiveAfterShutdown) {
socket_a_.Shutdown();
int data = 0;
- EXPECT_EQ(0u, socket_a_.Receive(&data, sizeof(data)));
+ EXPECT_EQ(0u, socket_a_.Receive(as_writable_bytes(make_span(&data, 1u))));
}
TEST_F(CancelableSyncSocketTest, ReceiveWithTimeoutAfterShutdown) {
diff --git a/build/install-build-deps.py b/build/install-build-deps.py
index 3d3cdaf..a38ad22 100755
--- a/build/install-build-deps.py
+++ b/build/install-build-deps.py
@@ -374,6 +374,8 @@
"libxrender1",
"libxtst6",
"x11-utils",
+ "xserver-xorg-core", # TODO(crbug.com/1417069): Experimental.
+ "xserver-xorg-video-dummy", # TODO(crbug.com/1417069): Experimental.
"xvfb",
"zlib1g",
]
diff --git a/ipc/ipc_mojo_bootstrap.cc b/ipc/ipc_mojo_bootstrap.cc
index ef479db..8e96016 100644
--- a/ipc/ipc_mojo_bootstrap.cc
+++ b/ipc/ipc_mojo_bootstrap.cc
@@ -24,13 +24,14 @@
#include "base/memory/raw_ptr.h"
#include "base/no_destructor.h"
#include "base/ranges/algorithm.h"
+#include "base/sequence_checker.h"
#include "base/strings/stringprintf.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/common/task_annotator.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/single_thread_task_runner.h"
-#include "base/threading/thread_checker.h"
+#include "base/thread_annotations.h"
#include "base/trace_event/memory_allocator_dump.h"
#include "base/trace_event/memory_dump_manager.h"
#include "base/trace_event/memory_dump_provider.h"
@@ -177,13 +178,14 @@
control_message_handler_(this),
control_message_proxy_thunk_(this),
control_message_proxy_(&control_message_proxy_thunk_) {
- thread_checker_.DetachFromThread();
control_message_handler_.SetDescription(
"IPC::mojom::Bootstrap [primary] PipeControlMessageHandler");
dispatcher_.SetValidator(std::make_unique<mojo::MessageHeaderValidator>(
"IPC::mojom::Bootstrap [primary] MessageHeaderValidator"));
GetMemoryDumpProvider().AddController(this);
+
+ DETACH_FROM_SEQUENCE(sequence_checker_);
}
ChannelAssociatedGroupController(const ChannelAssociatedGroupController&) =
@@ -214,16 +216,23 @@
}
void Pause() {
- DCHECK(!paused_);
+ DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+ CHECK(was_bound_or_message_sent_);
+ CHECK(!paused_);
paused_ = true;
}
void Unpause() {
- DCHECK(paused_);
+ DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+ CHECK(was_bound_or_message_sent_);
+ CHECK(paused_);
paused_ = false;
}
void FlushOutgoingMessages() {
+ DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+ CHECK(was_bound_or_message_sent_);
+
std::vector<mojo::Message> outgoing_messages;
{
base::AutoLock lock(outgoing_messages_lock_);
@@ -237,6 +246,8 @@
void Bind(mojo::ScopedMessagePipeHandle handle,
mojo::PendingAssociatedRemote<mojom::Channel>* sender,
mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) {
+ DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+
connector_ = std::make_unique<mojo::Connector>(
std::move(handle), mojo::Connector::SINGLE_THREADED_SEND,
"IPC Channel");
@@ -282,12 +293,21 @@
std::move(sender_handle), 0);
*receiver = mojo::PendingAssociatedReceiver<mojom::Channel>(
std::move(receiver_handle));
+
+ if (!was_bound_or_message_sent_) {
+ was_bound_or_message_sent_ = true;
+ DETACH_FROM_SEQUENCE(sequence_checker_);
+ }
}
- void StartReceiving() { connector_->StartReceiving(task_runner_); }
+ void StartReceiving() {
+ DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+ CHECK(was_bound_or_message_sent_);
+ connector_->StartReceiving(task_runner_);
+ }
void ShutDown() {
- DCHECK(thread_checker_.CalledOnValidThread());
+ DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
shut_down_ = true;
if (connector_)
connector_->CloseMessagePipe();
@@ -449,7 +469,10 @@
bool PrefersSerializedMessages() override { return true; }
void SetUrgentMessageObserver(UrgentMessageObserver* observer) {
+ DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+ CHECK(!was_bound_or_message_sent_);
urgent_message_observer_ = observer;
+ DETACH_FROM_SEQUENCE(sequence_checker_);
}
private:
@@ -847,7 +870,6 @@
bool SendMessage(mojo::Message* message) {
DCHECK(message->heap_profiler_tag());
if (task_runner_->BelongsToCurrentThread()) {
- DCHECK(thread_checker_.CalledOnValidThread());
return SendMessageOnSequence(message);
}
@@ -868,6 +890,9 @@
}
bool SendMessageOnSequence(mojo::Message* message) {
+ DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+ was_bound_or_message_sent_ = true;
+
if (!connector_ || paused_) {
if (!shut_down_) {
base::AutoLock lock(outgoing_messages_lock_);
@@ -885,7 +910,7 @@
}
void OnPipeError() {
- DCHECK(thread_checker_.CalledOnValidThread());
+ DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// We keep |this| alive here because it's possible for the notifications
// below to release all other references.
@@ -921,8 +946,8 @@
}
}
- void NotifyEndpointOfError(Endpoint* endpoint, bool force_async) {
- lock_.AssertAcquired();
+ void NotifyEndpointOfError(Endpoint* endpoint, bool force_async)
+ EXCLUSIVE_LOCKS_REQUIRED(lock_) {
DCHECK(endpoint->task_runner() && endpoint->client());
if (endpoint->task_runner()->RunsTasksInCurrentSequence() && !force_async) {
mojo::InterfaceEndpointClient* client = endpoint->client();
@@ -959,35 +984,35 @@
// Marks `endpoint` as closed and returns true if and only if its peer was
// also already closed.
- bool MarkClosed(Endpoint* endpoint) {
- lock_.AssertAcquired();
+ bool MarkClosed(Endpoint* endpoint) EXCLUSIVE_LOCKS_REQUIRED(lock_) {
endpoint->set_closed();
return endpoint->peer_closed();
}
// Marks `endpoint` as having a closed peer and returns true if and only if
// `endpoint` itself was also already closed.
- bool MarkPeerClosed(Endpoint* endpoint) {
- lock_.AssertAcquired();
+ bool MarkPeerClosed(Endpoint* endpoint) EXCLUSIVE_LOCKS_REQUIRED(lock_) {
endpoint->set_peer_closed();
endpoint->SignalSyncMessageEvent();
return endpoint->closed();
}
- void MarkClosedAndMaybeRemove(Endpoint* endpoint) {
+ void MarkClosedAndMaybeRemove(Endpoint* endpoint)
+ EXCLUSIVE_LOCKS_REQUIRED(lock_) {
if (MarkClosed(endpoint)) {
endpoints_.erase(endpoint->id());
}
}
- void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) {
+ void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint)
+ EXCLUSIVE_LOCKS_REQUIRED(lock_) {
if (MarkPeerClosed(endpoint)) {
endpoints_.erase(endpoint->id());
}
}
- Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) {
- lock_.AssertAcquired();
+ Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted)
+ EXCLUSIVE_LOCKS_REQUIRED(lock_) {
DCHECK(!inserted || !*inserted);
Endpoint* endpoint = FindEndpoint(id);
@@ -1000,15 +1025,14 @@
return endpoint;
}
- Endpoint* FindEndpoint(mojo::InterfaceId id) {
- lock_.AssertAcquired();
+ Endpoint* FindEndpoint(mojo::InterfaceId id) EXCLUSIVE_LOCKS_REQUIRED(lock_) {
auto iter = endpoints_.find(id);
return iter != endpoints_.end() ? iter->second.get() : nullptr;
}
// mojo::MessageReceiver:
bool Accept(mojo::Message* message) override {
- DCHECK(thread_checker_.CalledOnValidThread());
+ DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!message->DeserializeAssociatedEndpointHandles(this))
return false;
@@ -1207,7 +1231,7 @@
bool OnPeerAssociatedEndpointClosed(
mojo::InterfaceId id,
const std::optional<mojo::DisconnectReason>& reason) override {
- DCHECK(thread_checker_.CalledOnValidThread());
+ DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
base::AutoLock locker(lock_);
@@ -1229,42 +1253,47 @@
return false;
}
- // Checked in places which must be run on the primary endpoint's thread.
- base::ThreadChecker thread_checker_;
-
- scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
-
+ const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
const scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
const bool set_interface_id_namespace_bit_;
- bool paused_ = false;
- std::unique_ptr<mojo::Connector> connector_;
- mojo::MessageDispatcher dispatcher_;
- mojo::PipeControlMessageHandler control_message_handler_;
- ControlMessageProxyThunk control_message_proxy_thunk_;
+
+ // Ensures sequenced access to members below.
+ SEQUENCE_CHECKER(sequence_checker_);
+
+ // Whether `Bind()` or `SendMessageOnSequence()` was called.
+ // `sequence_checker_` can be detached when this is `false`.
+ bool was_bound_or_message_sent_ GUARDED_BY_CONTEXT(sequence_checker_) = false;
+
+ bool paused_ GUARDED_BY_CONTEXT(sequence_checker_) = false;
+ bool shut_down_ GUARDED_BY_CONTEXT(sequence_checker_) = false;
+ std::unique_ptr<mojo::Connector> connector_
+ GUARDED_BY_CONTEXT(sequence_checker_);
+ mojo::MessageDispatcher dispatcher_ GUARDED_BY_CONTEXT(sequence_checker_);
+ mojo::PipeControlMessageHandler control_message_handler_
+ GUARDED_BY_CONTEXT(sequence_checker_);
+ ControlMessageProxyThunk control_message_proxy_thunk_
+ GUARDED_BY_CONTEXT(sequence_checker_);
+ raw_ptr<UrgentMessageObserver> urgent_message_observer_
+ GUARDED_BY_CONTEXT(sequence_checker_) = nullptr;
// NOTE: It is unsafe to call into this object while holding |lock_|.
mojo::PipeControlMessageProxy control_message_proxy_;
- // Guards access to |outgoing_messages_| only. Used to support memory dumps
- // which may be triggered from any thread.
+ // Outgoing messages sent before this controller Bound() to a pipe or while it
+ // was paused. Protected by a lock to support memory dumps from any thread.
base::Lock outgoing_messages_lock_;
-
- // Outgoing messages that were sent before this controller was bound to a
- // real message pipe.
- std::vector<mojo::Message> outgoing_messages_;
+ std::vector<mojo::Message> outgoing_messages_
+ GUARDED_BY(outgoing_messages_lock_);
// Guards the fields below for thread-safe access.
base::Lock lock_;
- bool encountered_error_ = false;
- bool shut_down_ = false;
+ bool encountered_error_ GUARDED_BY(lock_) = false;
// ID #1 is reserved for the mojom::Channel interface.
- uint32_t next_interface_id_ = 2;
+ uint32_t next_interface_id_ GUARDED_BY(lock_) = 2;
- std::map<uint32_t, scoped_refptr<Endpoint>> endpoints_;
-
- raw_ptr<UrgentMessageObserver> urgent_message_observer_ = nullptr;
+ std::map<uint32_t, scoped_refptr<Endpoint>> endpoints_ GUARDED_BY(lock_);
};
namespace {
diff --git a/ipc/sync_socket_unittest.cc b/ipc/sync_socket_unittest.cc
index 03ece62..8a2c2c1 100644
--- a/ipc/sync_socket_unittest.cc
+++ b/ipc/sync_socket_unittest.cc
@@ -2,6 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+#include "base/sync_socket.h"
+
#include <stddef.h>
#include <stdio.h>
@@ -9,13 +11,14 @@
#include <sstream>
#include <string>
+#include "base/containers/span.h"
#include "base/functional/bind.h"
#include "base/location.h"
#include "base/memory/raw_ptr.h"
#include "base/run_loop.h"
-#include "base/sync_socket.h"
#include "base/task/single_thread_task_runner.h"
#include "base/threading/thread.h"
+#include "base/types/fixed_array.h"
#include "build/build_config.h"
#include "ipc/ipc_test_base.h"
#include "testing/gtest/include/gtest/gtest.h"
@@ -98,8 +101,8 @@
void SetHandle(base::SyncSocket::Handle handle) {
base::SyncSocket sync_socket(handle);
- EXPECT_EQ(sync_socket.Send(kHelloString, kHelloStringLength),
- kHelloStringLength);
+ auto bytes_to_send = base::as_byte_span(kHelloString);
+ EXPECT_EQ(sync_socket.Send(bytes_to_send), bytes_to_send.size());
IPC::Message* msg = new MsgClassResponse(kHelloString);
EXPECT_TRUE(chan_->Send(msg));
}
@@ -155,12 +158,14 @@
// string as was written on the SyncSocket. These are compared
// and a shutdown message is sent back to the server.
void OnMsgClassResponse(const std::string& str) {
+ // Account for the terminating null byte.
+ size_t expected_bytes_to_receive = str.length() + 1;
// We rely on the order of sync_socket.Send() and chan_->Send() in
// the SyncSocketServerListener object.
- EXPECT_EQ(kHelloStringLength, socket_->Peek());
- char buf[kHelloStringLength];
- socket_->Receive(static_cast<void*>(buf), kHelloStringLength);
- EXPECT_EQ(strcmp(str.c_str(), buf), 0);
+ EXPECT_EQ(socket_->Peek(), expected_bytes_to_receive);
+ base::FixedArray<char> buf(expected_bytes_to_receive);
+ socket_->Receive(base::as_writable_bytes(base::make_span(buf)));
+ EXPECT_EQ(strcmp(str.c_str(), buf.data()), 0);
// After receiving from the socket there should be no bytes left.
EXPECT_EQ(0U, socket_->Peek());
IPC::Message* msg = new MsgClassShutdown();
@@ -217,13 +222,13 @@
}
// A blocking read operation that will block the thread until it receives
-// |length| bytes of packets or Shutdown() is called on another thread.
-static void BlockingRead(base::SyncSocket* socket, char* buf,
- size_t length, size_t* received) {
- DCHECK_NE(buf, nullptr);
+// |buffer|'s length bytes of packets or Shutdown() is called on another thread.
+static void BlockingRead(base::SyncSocket* socket,
+ base::span<uint8_t> buffer,
+ size_t* received) {
// Notify the parent thread that we're up and running.
- socket->Send(kHelloString, kHelloStringLength);
- *received = socket->Receive(buf, length);
+ socket->Send(base::as_byte_span(kHelloString));
+ *received = socket->Receive(buffer);
}
// Tests that we can safely end a blocking Receive operation on one thread
@@ -239,13 +244,14 @@
char buf[0xff];
size_t received = 1U; // Initialize to an unexpected value.
worker.task_runner()->PostTask(
- FROM_HERE, base::BindOnce(&BlockingRead, &pair[0], &buf[0],
- std::size(buf), &received));
+ FROM_HERE,
+ base::BindOnce(&BlockingRead, &pair[0],
+ base::as_writable_bytes(base::make_span(buf)), &received));
// Wait for the worker thread to say hello.
char hello[kHelloStringLength] = {0};
- pair[1].Receive(&hello[0], sizeof(hello));
- EXPECT_EQ(0, strcmp(hello, kHelloString));
+ pair[1].Receive(base::as_writable_bytes(base::make_span(hello)));
+ EXPECT_EQ(strcmp(hello, kHelloString), 0);
// Give the worker a chance to start Receive().
base::PlatformThread::YieldCurrentThread();
@@ -270,24 +276,26 @@
char buf[kHelloStringLength] = {0};
size_t received = 1U; // Initialize to an unexpected value.
worker.task_runner()->PostTask(
- FROM_HERE, base::BindOnce(&BlockingRead, &pair[0], &buf[0],
- kHelloStringLength, &received));
+ FROM_HERE,
+ base::BindOnce(&BlockingRead, &pair[0],
+ base::as_writable_bytes(base::make_span(buf)), &received));
// Wait for the worker thread to say hello.
char hello[kHelloStringLength] = {0};
- pair[1].Receive(&hello[0], sizeof(hello));
+ pair[1].Receive(base::as_writable_bytes(base::make_span(hello)));
EXPECT_EQ(0, strcmp(hello, kHelloString));
// Give the worker a chance to start Receive().
base::PlatformThread::YieldCurrentThread();
// Send a message to the socket on the blocking thead, it should free the
// socket from Receive().
- pair[1].Send(kHelloString, kHelloStringLength);
+ auto bytes_to_send = base::as_byte_span(kHelloString);
+ pair[1].Send(bytes_to_send);
worker.Stop();
// Verify the socket has received the message.
EXPECT_TRUE(strcmp(buf, kHelloString) == 0);
- EXPECT_EQ(kHelloStringLength, received);
+ EXPECT_EQ(received, bytes_to_send.size());
}
// Tests that the write operation is non-blocking and returns immediately
@@ -298,7 +306,9 @@
// Fill up the buffer for one of the socket, Send() should not block the
// thread even when the buffer is full.
- while (pair[0].Send(kHelloString, kHelloStringLength) != 0) {}
+ auto bytes_to_send = base::as_byte_span(kHelloString);
+ while (pair[0].Send(bytes_to_send) != 0) {
+ }
// Data should be avialble on another socket.
size_t bytes_in_buffer = pair[1].Peek();
@@ -306,15 +316,15 @@
// No more data can be written to the buffer since socket has been full,
// verify that the amount of avialble data on another socket is unchanged.
- EXPECT_EQ(0U, pair[0].Send(kHelloString, kHelloStringLength));
+ EXPECT_EQ(pair[0].Send(bytes_to_send), 0U);
EXPECT_EQ(bytes_in_buffer, pair[1].Peek());
// Read from another socket to free some space for a new write.
char hello[kHelloStringLength] = {0};
- pair[1].Receive(&hello[0], sizeof(hello));
+ pair[1].Receive(base::as_writable_bytes(base::make_span(hello)));
// Should be able to write more data to the buffer now.
- EXPECT_EQ(kHelloStringLength, pair[0].Send(kHelloString, kHelloStringLength));
+ EXPECT_EQ(pair[0].Send(bytes_to_send), bytes_to_send.size());
}
} // namespace
diff --git a/mojo/public/cpp/bindings/README.md b/mojo/public/cpp/bindings/README.md
index 69d23af..95dc453 100644
--- a/mojo/public/cpp/bindings/README.md
+++ b/mojo/public/cpp/bindings/README.md
@@ -715,7 +715,7 @@
### Features
-Mojom `feature` generates a `base::FeatureList` with the given `name` and
+Mojom `feature` generates a `base::Feature` with the given `name` and
`default_state` (`true` => `ENABLED_BY_DEFAULT`). The feature can be accessed
and tested in C++ using the mapped name even if it is not used to mark any
interfaces or methods.