Fix memory leak in BlobRegistryImpl

Bug: 954458
Change-Id: I9980361d8b97c6de90d2784b229630cdabfbf0cd
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1575055
Commit-Queue: Tsuyoshi Horo <horo@chromium.org>
Reviewed-by: Marijn Kruisselbrink <mek@chromium.org>
Cr-Commit-Position: refs/heads/master@{#652802}
diff --git a/storage/browser/blob/blob_builder_from_stream.cc b/storage/browser/blob/blob_builder_from_stream.cc
index d13e1fd..e359852 100644
--- a/storage/browser/blob/blob_builder_from_stream.cc
+++ b/storage/browser/blob/blob_builder_from_stream.cc
@@ -305,9 +305,6 @@
     base::WeakPtr<BlobStorageContext> context,
     std::string content_type,
     std::string content_disposition,
-    uint64_t length_hint,
-    mojo::ScopedDataPipeConsumerHandle data,
-    blink::mojom::ProgressClientAssociatedPtrInfo progress_client,
     ResultCallback callback)
     : kMemoryBlockSize(std::min(
           kMaxMemoryChunkSize,
@@ -322,17 +319,22 @@
       content_disposition_(std::move(content_disposition)),
       weak_factory_(this) {
   DCHECK(context_);
-
-  context_->mutable_memory_controller()->CallWhenStorageLimitsAreKnown(
-      base::BindOnce(&BlobBuilderFromStream::AllocateMoreMemorySpace,
-                     weak_factory_.GetWeakPtr(), length_hint,
-                     std::move(progress_client), std::move(data)));
 }
 
 BlobBuilderFromStream::~BlobBuilderFromStream() {
   DCHECK(!callback_) << "BlobBuilderFromStream was destroyed before finishing";
 }
 
+void BlobBuilderFromStream::Start(
+    uint64_t length_hint,
+    mojo::ScopedDataPipeConsumerHandle data,
+    blink::mojom::ProgressClientAssociatedPtrInfo progress_client) {
+  context_->mutable_memory_controller()->CallWhenStorageLimitsAreKnown(
+      base::BindOnce(&BlobBuilderFromStream::AllocateMoreMemorySpace,
+                     weak_factory_.GetWeakPtr(), length_hint,
+                     std::move(progress_client), std::move(data)));
+}
+
 void BlobBuilderFromStream::Abort() {
   OnError(Result::kAborted);
 }
diff --git a/storage/browser/blob/blob_builder_from_stream.h b/storage/browser/blob/blob_builder_from_stream.h
index c313097..80bc732 100644
--- a/storage/browser/blob/blob_builder_from_stream.h
+++ b/storage/browser/blob/blob_builder_from_stream.h
@@ -55,12 +55,15 @@
       base::WeakPtr<BlobStorageContext> context,
       std::string content_type,
       std::string content_disposition,
-      uint64_t length_hint,
-      mojo::ScopedDataPipeConsumerHandle data,
-      blink::mojom::ProgressClientAssociatedPtrInfo progress_client,
       ResultCallback callback);
   ~BlobBuilderFromStream();
 
+  // This may call |callback| synchronously when |length_hint| is larger than
+  // the disk space.
+  void Start(uint64_t length_hint,
+             mojo::ScopedDataPipeConsumerHandle data,
+             blink::mojom::ProgressClientAssociatedPtrInfo progress_client);
+
   void Abort();
 
  private:
@@ -148,6 +151,7 @@
 
   std::string content_type_;
   std::string content_disposition_;
+
   std::vector<scoped_refptr<ShareableBlobDataItem>> items_;
   uint64_t current_total_size_ = 0;
   base::WeakPtr<BlobMemoryController::QuotaAllocationTask> pending_quota_task_;
diff --git a/storage/browser/blob/blob_builder_from_stream_unittest.cc b/storage/browser/blob/blob_builder_from_stream_unittest.cc
index 02f9d1d..2ca455be 100644
--- a/storage/browser/blob/blob_builder_from_stream_unittest.cc
+++ b/storage/browser/blob/blob_builder_from_stream_unittest.cc
@@ -94,14 +94,14 @@
     uint64_t length_hint = GetLengthHint(data.length());
     BlobBuilderFromStream* finished_builder = nullptr;
     BlobBuilderFromStream builder(
-        context_->AsWeakPtr(), kContentType, kContentDisposition, length_hint,
-        std::move(pipe.consumer_handle), nullptr,
+        context_->AsWeakPtr(), kContentType, kContentDisposition,
         base::BindLambdaForTesting([&](BlobBuilderFromStream* result_builder,
                                        std::unique_ptr<BlobDataHandle> blob) {
           finished_builder = result_builder;
           result = std::move(blob);
           loop.Quit();
         }));
+    builder.Start(length_hint, std::move(pipe.consumer_handle), nullptr);
 
     // Make sure the initial memory allocation done by the builder matches the
     // length hint passed in.
@@ -195,8 +195,7 @@
   base::RunLoop loop;
   BlobBuilderFromStream* builder_ptr = nullptr;
   auto builder = std::make_unique<BlobBuilderFromStream>(
-      context_->AsWeakPtr(), "", "", GetLengthHint(16),
-      std::move(pipe.consumer_handle), nullptr,
+      context_->AsWeakPtr(), "", "",
       base::BindLambdaForTesting([&](BlobBuilderFromStream* result_builder,
                                      std::unique_ptr<BlobDataHandle> blob) {
         EXPECT_EQ(builder_ptr, result_builder);
@@ -204,6 +203,7 @@
         loop.Quit();
       }));
   builder_ptr = builder.get();
+  builder->Start(GetLengthHint(16), std::move(pipe.consumer_handle), nullptr);
   builder->Abort();
   builder.reset();
   loop.Run();
@@ -361,13 +361,13 @@
   base::RunLoop loop;
   std::unique_ptr<BlobDataHandle> result;
   BlobBuilderFromStream builder(
-      context_->AsWeakPtr(), "", "", kLengthHint,
-      std::move(pipe.consumer_handle), nullptr,
+      context_->AsWeakPtr(), "", "",
       base::BindLambdaForTesting(
           [&](BlobBuilderFromStream*, std::unique_ptr<BlobDataHandle> blob) {
             result = std::move(blob);
             loop.Quit();
           }));
+  builder.Start(kLengthHint, std::move(pipe.consumer_handle), nullptr);
   pipe.producer_handle.reset();
   loop.Run();
 
@@ -384,13 +384,13 @@
   base::RunLoop loop;
   std::unique_ptr<BlobDataHandle> result;
   BlobBuilderFromStream builder(
-      context_->AsWeakPtr(), "", "", kLengthHint,
-      std::move(pipe.consumer_handle), nullptr,
+      context_->AsWeakPtr(), "", "",
       base::BindLambdaForTesting(
           [&](BlobBuilderFromStream*, std::unique_ptr<BlobDataHandle> blob) {
             result = std::move(blob);
             loop.Quit();
           }));
+  builder.Start(kLengthHint, std::move(pipe.consumer_handle), nullptr);
   pipe.producer_handle.reset();
   loop.Run();
 
@@ -413,13 +413,14 @@
   base::RunLoop loop;
   std::unique_ptr<BlobDataHandle> result;
   BlobBuilderFromStream builder(
-      context_->AsWeakPtr(), "", "", GetLengthHint(kData.size()),
-      std::move(pipe.consumer_handle), progress_client_ptr.PassInterface(),
+      context_->AsWeakPtr(), "", "",
       base::BindLambdaForTesting(
           [&](BlobBuilderFromStream*, std::unique_ptr<BlobDataHandle> blob) {
             result = std::move(blob);
             loop.Quit();
           }));
+  builder.Start(GetLengthHint(kData.size()), std::move(pipe.consumer_handle),
+                progress_client_ptr.PassInterface());
   mojo::BlockingCopyFromString(kData, pipe.producer_handle);
   pipe.producer_handle.reset();
 
@@ -447,13 +448,13 @@
   base::RunLoop loop;
   std::unique_ptr<BlobDataHandle> result;
   BlobBuilderFromStream builder(
-      context_->AsWeakPtr(), kContentType, kContentDisposition, kData.size(),
-      std::move(pipe.consumer_handle), nullptr,
+      context_->AsWeakPtr(), kContentType, kContentDisposition,
       base::BindLambdaForTesting([&](BlobBuilderFromStream* result_builder,
                                      std::unique_ptr<BlobDataHandle> blob) {
         result = std::move(blob);
         loop.Quit();
       }));
+  builder.Start(kData.size(), std::move(pipe.consumer_handle), nullptr);
 
   context_->set_limits_for_testing(limits_);
   auto data_producer = std::make_unique<mojo::StringDataPipeProducer>(
diff --git a/storage/browser/blob/blob_registry_impl.cc b/storage/browser/blob/blob_registry_impl.cc
index 792d13c..aa81ace 100644
--- a/storage/browser/blob/blob_registry_impl.cc
+++ b/storage/browser/blob/blob_registry_impl.cc
@@ -567,11 +567,15 @@
     return;
   }
 
-  blobs_being_streamed_.insert(std::make_unique<BlobBuilderFromStream>(
-      context_, content_type, content_disposition, expected_length,
-      std::move(data), std::move(progress_client),
-      base::BindOnce(&BlobRegistryImpl::StreamingBlobDone,
-                     base::Unretained(this), std::move(callback))));
+  std::unique_ptr<BlobBuilderFromStream> blob_builder =
+      std::make_unique<BlobBuilderFromStream>(
+          context_, content_type, content_disposition,
+          base::BindOnce(&BlobRegistryImpl::StreamingBlobDone,
+                         base::Unretained(this), std::move(callback)));
+  BlobBuilderFromStream* blob_builder_ptr = blob_builder.get();
+  blobs_being_streamed_.insert(std::move(blob_builder));
+  blob_builder_ptr->Start(expected_length, std::move(data),
+                          std::move(progress_client));
 }
 
 void BlobRegistryImpl::GetBlobFromUUID(blink::mojom::BlobRequest blob,
diff --git a/storage/browser/blob/blob_registry_impl.h b/storage/browser/blob/blob_registry_impl.h
index 3460984..653feea 100644
--- a/storage/browser/blob/blob_registry_impl.h
+++ b/storage/browser/blob/blob_registry_impl.h
@@ -67,6 +67,10 @@
     return blobs_under_construction_.size();
   }
 
+  size_t BlobsBeingStreamedForTesting() const {
+    return blobs_being_streamed_.size();
+  }
+
   using URLStoreCreationHook = base::RepeatingCallback<void(
       mojo::StrongAssociatedBindingPtr<blink::mojom::BlobURLStore>)>;
   static void SetURLStoreCreationHookForTesting(URLStoreCreationHook* hook);
diff --git a/storage/browser/blob/blob_registry_impl_unittest.cc b/storage/browser/blob/blob_registry_impl_unittest.cc
index c63e23ba..31be8a2 100644
--- a/storage/browser/blob/blob_registry_impl_unittest.cc
+++ b/storage/browser/blob/blob_registry_impl_unittest.cc
@@ -171,6 +171,10 @@
     return registry_impl_->BlobsUnderConstructionForTesting();
   }
 
+  size_t BlobsBeingStreamed() {
+    return registry_impl_->BlobsBeingStreamedForTesting();
+  }
+
  protected:
   base::ScopedTempDir data_dir_;
   base::test::ScopedTaskEnvironment scoped_task_environment_;
@@ -1014,18 +1018,20 @@
   mojo::AssociatedBinding<blink::mojom::ProgressClient> progress_binding(
       &progress_client, MakeRequest(&progress_client_ptr));
 
-  mojo::DataPipe pipe;
+  mojo::ScopedDataPipeProducerHandle producer;
+  mojo::ScopedDataPipeConsumerHandle consumer;
+  mojo::CreateDataPipe(nullptr, &producer, &consumer);
   blink::mojom::SerializedBlobPtr blob;
   base::RunLoop loop;
   registry_->RegisterFromStream(
-      kContentType, kContentDisposition, kData.length(),
-      std::move(pipe.consumer_handle), std::move(progress_client_ptr),
+      kContentType, kContentDisposition, kData.length(), std::move(consumer),
+      std::move(progress_client_ptr),
       base::BindLambdaForTesting([&](blink::mojom::SerializedBlobPtr result) {
         blob = std::move(result);
         loop.Quit();
       }));
-  mojo::BlockingCopyFromString(kData, pipe.producer_handle);
-  pipe.producer_handle.reset();
+  mojo::BlockingCopyFromString(kData, producer);
+  producer.reset();
   loop.Run();
 
   ASSERT_TRUE(blob);
@@ -1038,6 +1044,39 @@
 
   EXPECT_EQ(kData.length(), progress_client.total_size);
   EXPECT_GE(progress_client.call_count, 1);
+
+  EXPECT_EQ(0u, BlobsBeingStreamed());
+}
+
+TEST_F(BlobRegistryImplTest, RegisterFromStream_NoDiskSpace) {
+  const std::string kData =
+      base::RandBytesAsString(kTestBlobStorageMaxDiskSpace + 1);
+  const std::string kContentType = "content/type";
+  const std::string kContentDisposition = "disposition";
+
+  FakeProgressClient progress_client;
+  blink::mojom::ProgressClientAssociatedPtrInfo progress_client_ptr;
+  mojo::AssociatedBinding<blink::mojom::ProgressClient> progress_binding(
+      &progress_client, MakeRequest(&progress_client_ptr));
+
+  mojo::ScopedDataPipeProducerHandle producer;
+  mojo::ScopedDataPipeConsumerHandle consumer;
+  mojo::CreateDataPipe(nullptr, &producer, &consumer);
+  blink::mojom::SerializedBlobPtr blob;
+  base::RunLoop loop;
+  registry_->RegisterFromStream(
+      kContentType, kContentDisposition, kData.length(), std::move(consumer),
+      std::move(progress_client_ptr),
+      base::BindLambdaForTesting([&](blink::mojom::SerializedBlobPtr result) {
+        blob = std::move(result);
+        loop.Quit();
+      }));
+  mojo::BlockingCopyFromString(kData, producer);
+  producer.reset();
+  loop.Run();
+
+  EXPECT_FALSE(blob);
+  EXPECT_EQ(0u, BlobsBeingStreamed());
 }
 
 TEST_F(BlobRegistryImplTest, DestroyWithUnfinishedStream) {