[FilesTiering] Start new pin batches on completion of downloads

Pinning a file simply sets a value in the drivefs db, but does not
indicate the file is actually available offline. To ensure files are
actually made available offline and are appropriately batched, only
start new batches once the old one has finished (observing the status
via `OnSyncingStatusUpdate`).

Bug: b:259454320
Test: chromeos_unittests --gtest_filter=*DriveFsPinManager*
Change-Id: I757f51ae3f86f39c924e1a4789d8f90e3851c0d7
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/4060136
Reviewed-by: Bo Majewski <majewski@chromium.org>
Commit-Queue: Ben Reich <benreich@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1079037}
diff --git a/chromeos/ash/components/drivefs/drivefs_pin_manager.cc b/chromeos/ash/components/drivefs/drivefs_pin_manager.cc
index aee7a49a..4291d7b 100644
--- a/chromeos/ash/components/drivefs/drivefs_pin_manager.cc
+++ b/chromeos/ash/components/drivefs/drivefs_pin_manager.cc
@@ -6,7 +6,6 @@
 
 #include <vector>
 
-#include "base/barrier_callback.h"
 #include "base/functional/callback_forward.h"
 #include "base/logging.h"
 #include "base/ranges/algorithm.h"
@@ -168,6 +167,7 @@
 }
 
 void DriveFsPinManager::Complete(PinError status) {
+  weak_ptr_factory_.InvalidateWeakPtrs();
   search_query_.reset();
   free_space_ = 0;
   size_required_ = 0;
@@ -226,9 +226,6 @@
     return;
   }
 
-  auto pinned_all = base::BarrierCallback<DrivePathAndStatus>(
-      unpinned_items, base::BindOnce(&DriveFsPinManager::OnFilesPinned,
-                                     weak_ptr_factory_.GetWeakPtr()));
   for (const auto& item : items.value()) {
     if (item->metadata->pinned) {
       VLOG(1) << "Item is already pinned, ignoring when batch pinning";
@@ -237,33 +234,59 @@
     base::FilePath path(item->path);
     drivefs_interface_->SetPinned(
         path, /*pinned=*/true,
-        base::BindOnce(
-            [](const base::FilePath& path, drive::FileError status) {
-              DrivePathAndStatus path_status = {path, status};
-              return path_status;
-            },
-            path)
-            .Then(pinned_all));
+        base::BindOnce(&DriveFsPinManager::OnFilePinned,
+                       weak_ptr_factory_.GetWeakPtr(), path));
   }
 }
 
-void DriveFsPinManager::OnFilesPinned(
-    std::vector<DrivePathAndStatus> pinned_files) {
-  for (const auto& [path, status] : pinned_files) {
-    if (status != drive::FILE_ERROR_OK) {
-      LOG(ERROR) << "Failed pinning an item: " << status;
-      VLOG(2) << "Path that failed to pin: " << path.value() << " with error "
-              << drive::FileErrorToString(status);
-      std::move(complete_callback_).Run(PinError::kErrorFailedToPinItem);
-      return;
-    }
+void DriveFsPinManager::OnFilePinned(const base::FilePath& path,
+                                     drive::FileError status) {
+  if (status != drive::FILE_ERROR_OK) {
+    LOG(ERROR) << "Failed pinning an item: " << status;
+    VLOG(2) << "Path that failed to pin: " << path.value() << " with error "
+            << drive::FileErrorToString(status);
+    Complete(PinError::kErrorFailedToPinItem);
+    return;
   }
 
-  VLOG(2) << "Finished setting pinned status on " << pinned_files.size()
-          << " items";
-  search_query_->GetNextPage(
-      base::BindOnce(&DriveFsPinManager::OnSearchResultsForPinning,
-                     weak_ptr_factory_.GetWeakPtr()));
+  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+  // Emplace an item with no progress, these values (i.e. 0,0) will get updated
+  // in the `OnSyncingStatusUpdate`.
+  in_progress_items_.try_emplace(path.value(), /*bytes_transferred=*/0,
+                                 /*bytes_to_transfer=*/0);
+}
+
+void DriveFsPinManager::OnSyncingStatusUpdate(
+    const mojom::SyncingStatus& status) {
+  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+
+  for (const auto& item : status.item_events) {
+    auto cloned_item = item.Clone();
+    auto transferred = in_progress_items_.find(cloned_item->path);
+    if (transferred == in_progress_items_.end()) {
+      continue;
+    }
+    // TODO(b/259454320): Hosted files (e.g. gdoc) do not send an update via the
+    // `OnSyncingStatusUpdate` method. Need to add a method to cleanse the
+    // `in_progress_items_` map to ensure any values that are small enough or
+    // optimistically pinned get removed.
+    if (cloned_item->state == mojom::ItemEvent::State::kCompleted) {
+      in_progress_items_.erase(cloned_item->path);
+      VLOG(2) << "Removing completed items from sync map: "
+              << cloned_item->path.c_str();
+      continue;
+    }
+    transferred->second.first = cloned_item->bytes_transferred;
+    transferred->second.second = cloned_item->bytes_to_transfer;
+  }
+
+  if (in_progress_items_.size() == 0) {
+    VLOG(2) << "Current batch below threshold (" << in_progress_items_.size()
+            << "), starting new batch";
+    search_query_->GetNextPage(
+        base::BindOnce(&DriveFsPinManager::OnSearchResultsForPinning,
+                       weak_ptr_factory_.GetWeakPtr()));
+  }
 }
 
 }  // namespace drivefs::pinning
diff --git a/chromeos/ash/components/drivefs/drivefs_pin_manager.h b/chromeos/ash/components/drivefs/drivefs_pin_manager.h
index 41048e0..bdc22692 100644
--- a/chromeos/ash/components/drivefs/drivefs_pin_manager.h
+++ b/chromeos/ash/components/drivefs/drivefs_pin_manager.h
@@ -5,13 +5,17 @@
 #ifndef CHROMEOS_ASH_COMPONENTS_DRIVEFS_DRIVEFS_PIN_MANAGER_H_
 #define CHROMEOS_ASH_COMPONENTS_DRIVEFS_DRIVEFS_PIN_MANAGER_H_
 
+#include <utility>
 #include <vector>
 
 #include "base/component_export.h"
 #include "base/functional/callback.h"
 #include "base/memory/raw_ptr.h"
 #include "base/memory/weak_ptr.h"
+#include "base/sequence_checker.h"
+#include "base/thread_annotations.h"
 #include "base/timer/elapsed_timer.h"
+#include "chromeos/ash/components/drivefs/drivefs_host_observer.h"
 #include "chromeos/ash/components/drivefs/mojom/drivefs.mojom.h"
 #include "components/drive/file_errors.h"
 #include "mojo/public/cpp/bindings/remote.h"
@@ -60,7 +64,8 @@
 //  - Maintain pinning of files that are newly created.
 //  - Rebuild the progress of bulk pinned items (if turned off mid way through a
 //    bulk pinning event).
-class COMPONENT_EXPORT(CHROMEOS_ASH_COMPONENTS_DRIVEFS) DriveFsPinManager {
+class COMPONENT_EXPORT(CHROMEOS_ASH_COMPONENTS_DRIVEFS) DriveFsPinManager
+    : public DriveFsHostObserver {
  public:
   DriveFsPinManager(bool enabled,
                     const base::FilePath& profile_path,
@@ -73,7 +78,7 @@
   DriveFsPinManager(const DriveFsPinManager&) = delete;
   DriveFsPinManager& operator=(const DriveFsPinManager&) = delete;
 
-  ~DriveFsPinManager();
+  ~DriveFsPinManager() override;
 
   // Enable or disable the bulk pinning.
   void SetBulkPinningEnabled(bool enabled) { enabled_ = enabled; }
@@ -85,6 +90,9 @@
   // pinning has completed.
   void Start(base::OnceCallback<void(PinError)> complete_callback);
 
+  // drivefs::DriveFsHostObserver
+  void OnSyncingStatusUpdate(const mojom::SyncingStatus& status) override;
+
  private:
   // Invoked on retrieval of available space in the `~/GCache` directory.
   void OnFreeDiskSpaceRetrieved(int64_t free_space);
@@ -109,7 +117,11 @@
       drive::FileError error,
       absl::optional<std::vector<drivefs::mojom::QueryItemPtr>> items);
 
-  void OnFilesPinned(std::vector<DrivePathAndStatus> pinned_files);
+  // After a file has been pinned, this ensures the in progress map has the item
+  // emplaced. Note the file being pinned is just an update in drivefs, not the
+  // actually completion of the file being downloaded, that is monitored via
+  // `OnSyncingStatusUpdate`.
+  void OnFilePinned(const base::FilePath& path, drive::FileError status);
 
   bool enabled_ = false;
   int64_t size_required_ = 0;
@@ -122,6 +134,13 @@
   mojo::Remote<mojom::SearchQuery> search_query_;
   base::ElapsedTimer timer_;
 
+  SEQUENCE_CHECKER(sequence_checker_);
+  // A map that tracks the in progress items by their key to a pair of `int64_t`
+  // with `first` being the number of bytes transferred and `second` being the
+  // `bytes_to_transfer` i.e. the total bytes of the syncing file.
+  using InProgressMap = std::map<std::string, std::pair<int64_t, int64_t>>;
+  InProgressMap in_progress_items_ GUARDED_BY_CONTEXT(sequence_checker_);
+
   base::WeakPtrFactory<DriveFsPinManager> weak_ptr_factory_{this};
 };
 
diff --git a/chromeos/ash/components/drivefs/drivefs_pin_manager_unittest.cc b/chromeos/ash/components/drivefs/drivefs_pin_manager_unittest.cc
index fc89360..07d44ad 100644
--- a/chromeos/ash/components/drivefs/drivefs_pin_manager_unittest.cc
+++ b/chromeos/ash/components/drivefs/drivefs_pin_manager_unittest.cc
@@ -34,6 +34,7 @@
 // for the pinning manager.
 struct DriveItem {
   int64_t size;
+  base::FilePath path;
   bool pinned;
 };
 
@@ -50,6 +51,7 @@
     items.back()->metadata->capabilities = mojom::Capabilities::New();
     items.back()->metadata->size = item.size;
     items.back()->metadata->pinned = item.pinned;
+    items.back()->path = item.path;
   }
   *arg0 = std::move(items);
 }
@@ -134,6 +136,33 @@
     gcache_dir_ = temp_dir_.GetPath().Append("GCache");
   }
 
+  mojom::SyncingStatusPtr CreateSyncingStatusUpdate(
+      const std::vector<DriveItem> items) {
+    mojom::SyncingStatusPtr status = mojom::SyncingStatus::New();
+
+    std::vector<mojom::ItemEventPtr> item_events;
+    for (const auto& item : items) {
+      if (item.pinned) {
+        continue;
+      }
+      mojom::ItemEventPtr item_event = mojom::ItemEvent::New();
+      item_event->path = item.path.value();
+      item_event->state = mojom::ItemEvent::State::kQueued;
+      item_event->bytes_to_transfer = item.size;
+      item_events.push_back(std::move(item_event));
+    }
+
+    status->item_events = std::move(item_events);
+    return status;
+  }
+
+  void ChangeAllItemEventsToState(std::vector<mojom::ItemEventPtr>& item_events,
+                                  mojom::ItemEvent::State state) {
+    for (auto& item : item_events) {
+      item->state = state;
+    }
+  }
+
   base::test::TaskEnvironment task_environment_;
   base::ScopedTempDir temp_dir_;
   base::FilePath gcache_dir_;
@@ -292,7 +321,9 @@
   base::RunLoop run_loop;
 
   std::vector<DriveItem> expected_drive_items = {
-      {.size = 128}, {.size = 128}, {.size = 128, .pinned = true}};
+      {.size = 128, .path = base::FilePath("/a")},
+      {.size = 128, .path = base::FilePath("/b")},
+      {.size = 128, .path = base::FilePath("/c"), .pinned = true}};
 
   EXPECT_CALL(mock_drivefs_, OnStartSearchQuery(_)).Times(2);
   EXPECT_CALL(mock_drivefs_, OnGetNextPage(_))
@@ -301,26 +332,55 @@
                       Return(drive::FileError::FILE_ERROR_OK)))
       .WillOnce(DoAll(PopulateNoSearchItems(),
                       Return(drive::FileError::FILE_ERROR_OK)))
-      // Results returned when actually performing the pinning.
+      // Results returned when actually performing the pinning, the final
+      // response (i.e. PopulateNoSearchItems()) happens after the
+      // `OnSyncingStatusUpdate` instead.
       .WillOnce(DoAll(PopulateSearchItems(expected_drive_items),
-                      Return(drive::FileError::FILE_ERROR_OK)))
-      .WillOnce(DoAll(PopulateNoSearchItems(),
                       Return(drive::FileError::FILE_ERROR_OK)));
-  EXPECT_CALL(mock_callback, Run(PinError::kSuccess))
-      .WillOnce(RunClosure(run_loop.QuitClosure()));
   EXPECT_CALL(*mock_free_disk_space, AmountOfFreeDiskSpace(gcache_dir_, _))
       .WillOnce(RunOnceCallback<1>(1024));  // 1 MB.
   EXPECT_CALL(mock_drivefs_, SetPinned(_, true, _))
-      // SetPinned should only be called twice with the third file being ignored
-      // as it is already pinned.
       .Times(2)
-      .WillRepeatedly(RunOnceCallback<2>(drive::FILE_ERROR_OK));
+      .WillOnce(RunOnceCallback<2>(drive::FILE_ERROR_OK))
+      // `RunOnceCallback` can't be chained together in a `DoAll` action
+      // combinator, so use an inline lambda instead.
+      .WillOnce(
+          [&run_loop](const base::FilePath& path, bool pinned,
+                      base::OnceCallback<void(drive::FileError)> callback) {
+            std::move(callback).Run(drive::FILE_ERROR_OK);
+            run_loop.QuitClosure().Run();
+          });
 
   auto manager = std::make_unique<DriveFsPinManager>(
       /*enabled=*/true, temp_dir_.GetPath(), &mock_drivefs_,
       std::move(mock_free_disk_space));
   manager->Start(mock_callback.Get());
   run_loop.Run();
+
+  // Create the syncing status update and emit the update to the manager.
+  mojom::SyncingStatusPtr status =
+      CreateSyncingStatusUpdate(expected_drive_items);
+  manager->OnSyncingStatusUpdate(*status);
+
+  // When all items are in progress, they should not start iterating over the
+  // next search page.
+  ChangeAllItemEventsToState(status->item_events,
+                             mojom::ItemEvent::State::kInProgress);
+  manager->OnSyncingStatusUpdate(*status);
+
+  // Flipping all the events to `kCompleted` should then start the next query.
+  // By populating no search items this indicates the end of the available items
+  // and thus it finished.
+  base::RunLoop new_run_loop;
+  EXPECT_CALL(mock_drivefs_, OnGetNextPage(_))
+      .WillOnce(DoAll(PopulateNoSearchItems(),
+                      Return(drive::FileError::FILE_ERROR_OK)));
+  EXPECT_CALL(mock_callback, Run(PinError::kSuccess))
+      .WillOnce(RunClosure(new_run_loop.QuitClosure()));
+  ChangeAllItemEventsToState(status->item_events,
+                             mojom::ItemEvent::State::kCompleted);
+  manager->OnSyncingStatusUpdate(*status);
+  new_run_loop.Run();
 }
 
 }  // namespace