Fix a race condition where content is incorrectly garbage collected.
1. A new session is created with strategy REQUEST_WITH_CONTENT.
2. Initialization hasn't completed, so the new session is not included in
   SessionCache when GC is enqueued.
3. The request completes with a new HEAD, GC runs.
4. GC deletes content that was branched into the session in step #1.

PiperOrigin-RevId: 261332120
Change-Id: Idbbab6732f28315cfb2ed4f0cfc7a097d957d5ef
diff --git a/src/main/java/com/google/android/libraries/feed/api/host/config/Configuration.java b/src/main/java/com/google/android/libraries/feed/api/host/config/Configuration.java
index 5aefa7e..e82350a 100644
--- a/src/main/java/com/google/android/libraries/feed/api/host/config/Configuration.java
+++ b/src/main/java/com/google/android/libraries/feed/api/host/config/Configuration.java
@@ -191,6 +191,11 @@
     return values.containsKey(key);
   }
 
+  /** Returns a {@link Builder} for this {@link Configuration}. */
+  public Builder toBuilder() {
+    return new Builder(values);
+  }
+
   /** Returns a default {@link Configuration}. */
   public static Configuration getDefaultInstance() {
     return new Builder().build();
@@ -198,7 +203,15 @@
 
   /** Builder class used to create {@link Configuration} objects. */
   public static final class Builder {
-    private final HashMap<String, Object> values = new HashMap<>();
+    private final HashMap<String, Object> values;
+
+    private Builder(HashMap<String, Object> values) {
+      this.values = new HashMap<>(values);
+    }
+
+    public Builder() {
+      this(new HashMap<>());
+    }
 
     public Builder put(@ConfigKey String key, String value) {
       values.put(key, value);
diff --git a/src/main/java/com/google/android/libraries/feed/api/host/logging/Task.java b/src/main/java/com/google/android/libraries/feed/api/host/logging/Task.java
index 5158b68..026e054 100644
--- a/src/main/java/com/google/android/libraries/feed/api/host/logging/Task.java
+++ b/src/main/java/com/google/android/libraries/feed/api/host/logging/Task.java
@@ -56,7 +56,6 @@
   Task.SESSION_MANAGER_TRIGGER_REFRESH,
   Task.SESSION_MUTATION,
   Task.TASK_QUEUE_INITIALIZE,
-  Task.UPDATE_CONTENT_TRACKER,
   Task.UPLOAD_ALL_ACTIONS_FOR_URL,
   Task.NEXT_VALUE,
 })
@@ -95,7 +94,6 @@
   int SESSION_MANAGER_TRIGGER_REFRESH = 28;
   int SESSION_MUTATION = 29;
   int TASK_QUEUE_INITIALIZE = 30;
-  int UPDATE_CONTENT_TRACKER = 31;
   int UPLOAD_ALL_ACTIONS_FOR_URL = 32;
 
   // The next value that should be used when adding additional values to the IntDef.
diff --git a/src/main/java/com/google/android/libraries/feed/common/concurrent/testing/FakeDirectExecutor.java b/src/main/java/com/google/android/libraries/feed/common/concurrent/testing/FakeDirectExecutor.java
index 92cc1ab..74659f9 100644
--- a/src/main/java/com/google/android/libraries/feed/common/concurrent/testing/FakeDirectExecutor.java
+++ b/src/main/java/com/google/android/libraries/feed/common/concurrent/testing/FakeDirectExecutor.java
@@ -14,23 +14,57 @@
 
 package com.google.android.libraries.feed.common.concurrent.testing;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /** Fake {@link Executor} that enforces a background thread. */
 public final class FakeDirectExecutor implements Executor {
+  private final AtomicBoolean currentlyExecutingTasks = new AtomicBoolean();
   private final FakeThreadUtils fakeThreadUtils;
+  private final List<Runnable> tasksToRun = new ArrayList<>();
+  private final boolean shouldQueueTasks;
 
-  public FakeDirectExecutor(FakeThreadUtils fakeThreadUtils) {
+  public static FakeDirectExecutor runTasksImmediately(FakeThreadUtils fakeThreadUtils) {
+    return new FakeDirectExecutor(fakeThreadUtils, /* shouldQueueTasks= */ false);
+  }
+
+  public static FakeDirectExecutor queueAllTasks(FakeThreadUtils fakeThreadUtils) {
+    return new FakeDirectExecutor(fakeThreadUtils, /* shouldQueueTasks= */ true);
+  }
+
+  private FakeDirectExecutor(FakeThreadUtils fakeThreadUtils, boolean shouldQueueTasks) {
     this.fakeThreadUtils = fakeThreadUtils;
+    this.shouldQueueTasks = shouldQueueTasks;
   }
 
   @Override
   public void execute(Runnable command) {
+    tasksToRun.add(command);
+    if (!shouldQueueTasks) {
+      runAllTasks();
+    }
+  }
+
+  public void runAllTasks() {
+    if (currentlyExecutingTasks.getAndSet(true)) {
+      return;
+    }
+
     boolean policy = fakeThreadUtils.enforceMainThread(false);
     try {
-      command.run();
+      while (!tasksToRun.isEmpty()) {
+        Runnable task = tasksToRun.remove(0);
+        task.run();
+      }
     } finally {
       fakeThreadUtils.enforceMainThread(policy);
+      currentlyExecutingTasks.set(false);
     }
   }
+
+  public boolean hasTasks() {
+    return !tasksToRun.isEmpty();
+  }
 }
diff --git a/src/main/java/com/google/android/libraries/feed/common/concurrent/testing/FakeTaskQueue.java b/src/main/java/com/google/android/libraries/feed/common/concurrent/testing/FakeTaskQueue.java
index c2e4c0e..758cf68 100644
--- a/src/main/java/com/google/android/libraries/feed/common/concurrent/testing/FakeTaskQueue.java
+++ b/src/main/java/com/google/android/libraries/feed/common/concurrent/testing/FakeTaskQueue.java
@@ -27,7 +27,7 @@
   public FakeTaskQueue(FakeClock fakeClock, FakeThreadUtils fakeThreadUtils) {
     super(
         new FakeBasicLoggingApi(),
-        new FakeDirectExecutor(fakeThreadUtils),
+        FakeDirectExecutor.runTasksImmediately(fakeThreadUtils),
         FakeMainThreadRunner.create(fakeClock),
         fakeClock);
   }
diff --git a/src/main/java/com/google/android/libraries/feed/common/testing/InfraIntegrationScope.java b/src/main/java/com/google/android/libraries/feed/common/testing/InfraIntegrationScope.java
index d39afed..749e022 100644
--- a/src/main/java/com/google/android/libraries/feed/common/testing/InfraIntegrationScope.java
+++ b/src/main/java/com/google/android/libraries/feed/common/testing/InfraIntegrationScope.java
@@ -51,7 +51,6 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -69,8 +68,8 @@
 
   private final Configuration configuration;
   private final ContentStorageDirect contentStorage;
-  private final Executor executor;
   private final FakeClock fakeClock;
+  private final FakeDirectExecutor fakeDirectExecutor;
   private final FakeMainThreadRunner fakeMainThreadRunner;
   private final FakeFeedRequestManager fakeFeedRequestManager;
   private final FakeThreadUtils fakeThreadUtils;
@@ -86,7 +85,7 @@
 
   private InfraIntegrationScope(
       FakeThreadUtils fakeThreadUtils,
-      Executor executor,
+      FakeDirectExecutor fakeDirectExecutor,
       SchedulerApi schedulerApi,
       FakeClock fakeClock,
       Configuration configuration,
@@ -96,8 +95,8 @@
     this.fakeClock = fakeClock;
     this.configuration = configuration;
     this.contentStorage = contentStorage;
-    this.executor = executor;
     this.journalStorage = journalStorage;
+    this.fakeDirectExecutor = fakeDirectExecutor;
     this.fakeMainThreadRunner = fakeMainThreadRunner;
     this.schedulerApi = schedulerApi;
     this.fakeThreadUtils = fakeThreadUtils;
@@ -106,7 +105,8 @@
     FakeBasicLoggingApi fakeBasicLoggingApi = new FakeBasicLoggingApi();
 
     FeedExtensionRegistry extensionRegistry = new FeedExtensionRegistry(new ExtensionProvider());
-    taskQueue = new TaskQueue(fakeBasicLoggingApi, executor, fakeMainThreadRunner, fakeClock);
+    taskQueue =
+        new TaskQueue(fakeBasicLoggingApi, fakeDirectExecutor, fakeMainThreadRunner, fakeClock);
     store =
         new FeedStore(
             timingUtils,
@@ -173,6 +173,10 @@
     return fakeClock;
   }
 
+  public FakeDirectExecutor getFakeDirectExecutor() {
+    return fakeDirectExecutor;
+  }
+
   public FakeMainThreadRunner getFakeMainThreadRunner() {
     return fakeMainThreadRunner;
   }
@@ -205,7 +209,7 @@
   public InfraIntegrationScope clone() {
     return new InfraIntegrationScope(
         fakeThreadUtils,
-        executor,
+        fakeDirectExecutor,
         schedulerApi,
         fakeClock,
         configuration,
@@ -229,6 +233,8 @@
     private final FakeThreadUtils fakeThreadUtils = FakeThreadUtils.withThreadChecks();
 
     private Configuration configuration = Configuration.getDefaultInstance();
+    private FakeDirectExecutor fakeDirectExecutor =
+        FakeDirectExecutor.runTasksImmediately(fakeThreadUtils);
     private SchedulerApi schedulerApi = new FakeSchedulerApi(fakeThreadUtils);
 
     public Builder() {}
@@ -243,9 +249,14 @@
       return this;
     }
 
+    public Builder withQueuingTasks() {
+      fakeDirectExecutor = FakeDirectExecutor.queueAllTasks(fakeThreadUtils);
+      return this;
+    }
+
     public Builder withTimeoutSessionConfiguration(long timeoutMs) {
-      this.configuration =
-          new Configuration.Builder()
+      configuration =
+          configuration.toBuilder()
               .put(ConfigKey.USE_TIMEOUT_SCHEDULER, USE_TIMEOUT_SCHEDULER)
               .put(ConfigKey.TIMEOUT_TIMEOUT_MS, timeoutMs)
               .build();
@@ -255,7 +266,7 @@
     public InfraIntegrationScope build() {
       return new InfraIntegrationScope(
           fakeThreadUtils,
-          new FakeDirectExecutor(fakeThreadUtils),
+          fakeDirectExecutor,
           schedulerApi,
           fakeClock,
           configuration,
diff --git a/src/main/java/com/google/android/libraries/feed/feedsessionmanager/internal/SessionCache.java b/src/main/java/com/google/android/libraries/feed/feedsessionmanager/internal/SessionCache.java
index 14c2e4c..c9c9a8a 100644
--- a/src/main/java/com/google/android/libraries/feed/feedsessionmanager/internal/SessionCache.java
+++ b/src/main/java/com/google/android/libraries/feed/feedsessionmanager/internal/SessionCache.java
@@ -416,7 +416,6 @@
 
     HeadSessionImpl headSession = Validators.checkNotNull(head);
     String headSessionId = headSession.getSessionId();
-    List<SessionContentTracker> sessionContentTrackers = new ArrayList<>();
     boolean cleanupSessions = false;
     for (StreamSession session : sessionList) {
       SessionMetadata metadata = getOrCreateSessionMetadata(session);
@@ -448,28 +447,6 @@
         }
         sessionsMetadata.put(session.getSessionId(), metadata);
       }
-      SessionContentTracker sessionContentTracker =
-          new SessionContentTracker(/* supportsClearAll= */ false);
-      // Unbound sessions are able to be created through restore so we only create content trackers
-      // here and delay creation of unbound sessions until they are actually needed.
-      sessionContentTrackers.add(sessionContentTracker);
-
-      // Task which updates the newly created content tracker.
-      Runnable updateContentTracker =
-          () -> {
-            Logger.i(TAG, "Task: updateContentTracker %s", sessionId);
-            ElapsedTimeTracker timeTracker = timingUtils.getElapsedTimeTracker(TAG);
-            Result<List<StreamStructure>> streamStructuresResult =
-                store.getStreamStructures(sessionId);
-            if (streamStructuresResult.isSuccessful()) {
-              sessionContentTracker.update(streamStructuresResult.getValue());
-            } else {
-              Logger.e(TAG, "Failed to read unbound session state, ignored");
-            }
-            timeTracker.stop("task", "updateContentTracker");
-            unboundSessionCount++;
-          };
-      taskQueue.execute(Task.UPDATE_CONTENT_TRACKER, TaskType.BACKGROUND, updateContentTracker);
     }
 
     if (cleanupSessions) {
@@ -484,9 +461,7 @@
         Task.GARBAGE_COLLECT_CONTENT,
         TaskType.BACKGROUND,
         store.triggerContentGc(
-            reservedContentIds,
-            getAccessibleContentSupplier(sessionContentTrackers),
-            shouldKeepSharedStates()));
+            reservedContentIds, getAccessibleContentSupplier(), shouldKeepSharedStates()));
   }
 
   private boolean shouldKeepSharedStates() {
@@ -503,15 +478,31 @@
     return false;
   }
 
-  private Supplier<Set<String>> getAccessibleContentSupplier(
-      List<SessionContentTracker> sessionContentTrackers) {
+  private Supplier<Set<String>> getAccessibleContentSupplier() {
     return () -> {
-      /*
-       * We add head separately to make sure it is not GC-ed, just in case it is not present
-       * in {@link #getPersistedSessions()}.
-       */
+      threadUtils.checkNotMainThread();
+      Logger.i(TAG, "Determining accessible content");
+
+      // SessionIds should be determined at the time GC runs.
+      Set<String> sessionIds;
+      synchronized (lock) {
+        sessionIds = sessionsMetadata.keySet();
+      }
+
       Set<String> accessibleContent = new HashSet<>(head.getContentInSession());
-      for (SessionContentTracker sessionContentTracker : sessionContentTrackers) {
+      for (String sessionId : sessionIds) {
+        if (sessionId.equals(head.getSessionId())) {
+          continue;
+        }
+
+        SessionContentTracker sessionContentTracker =
+            new SessionContentTracker(/* supportsClearAll= */ false);
+        Result<List<StreamStructure>> streamStructuresResult = store.getStreamStructures(sessionId);
+        if (streamStructuresResult.isSuccessful()) {
+          sessionContentTracker.update(streamStructuresResult.getValue());
+        } else {
+          Logger.e(TAG, "Failed to read unbound session state, ignored");
+        }
         accessibleContent.addAll(sessionContentTracker.getContentIds());
       }
       return accessibleContent;
diff --git a/src/test/java/com/google/android/libraries/feed/infraintegration/BUILD b/src/test/java/com/google/android/libraries/feed/infraintegration/BUILD
index 4992cba..ad9229d 100644
--- a/src/test/java/com/google/android/libraries/feed/infraintegration/BUILD
+++ b/src/test/java/com/google/android/libraries/feed/infraintegration/BUILD
@@ -199,9 +199,11 @@
         "//src/main/java/com/google/android/libraries/feed/api/common",
         "//src/main/java/com/google/android/libraries/feed/api/host/config",
         "//src/main/java/com/google/android/libraries/feed/api/host/logging",
+        "//src/main/java/com/google/android/libraries/feed/api/host/scheduler",
         "//src/main/java/com/google/android/libraries/feed/api/internal/common",
+        "//src/main/java/com/google/android/libraries/feed/common/concurrent/testing",
         "//src/main/java/com/google/android/libraries/feed/common/testing",
-        "//src/main/java/com/google/android/libraries/feed/common/time/testing",
+        "//src/main/java/com/google/android/libraries/feed/testing/host/scheduler",
         "//src/main/proto/com/google/android/libraries/feed/api/internal/proto:client_feed_java_proto_lite",
         "//src/main/proto/search/now/wire/feed:feed_java_proto_lite",
         "//third_party:robolectric",
diff --git a/src/test/java/com/google/android/libraries/feed/infraintegration/GcTest.java b/src/test/java/com/google/android/libraries/feed/infraintegration/GcTest.java
index 3c02e51..7333649 100644
--- a/src/test/java/com/google/android/libraries/feed/infraintegration/GcTest.java
+++ b/src/test/java/com/google/android/libraries/feed/infraintegration/GcTest.java
@@ -20,10 +20,12 @@
 import com.google.android.libraries.feed.api.host.config.Configuration;
 import com.google.android.libraries.feed.api.host.config.Configuration.ConfigKey;
 import com.google.android.libraries.feed.api.host.logging.RequestReason;
+import com.google.android.libraries.feed.api.host.scheduler.SchedulerApi.RequestBehavior;
 import com.google.android.libraries.feed.api.internal.common.PayloadWithId;
+import com.google.android.libraries.feed.common.concurrent.testing.FakeThreadUtils;
 import com.google.android.libraries.feed.common.testing.InfraIntegrationScope;
 import com.google.android.libraries.feed.common.testing.ResponseBuilder;
-import com.google.android.libraries.feed.common.time.testing.FakeClock;
+import com.google.android.libraries.feed.testing.host.scheduler.FakeSchedulerApi;
 import com.google.search.now.feed.client.StreamDataProto.StreamSharedState;
 import com.google.search.now.feed.client.StreamDataProto.UiContext;
 import com.google.search.now.wire.feed.ContentIdProto.ContentId;
@@ -60,13 +62,15 @@
         ResponseBuilder.createFeatureContentId(3), ResponseBuilder.createFeatureContentId(4)
       };
   private static final long LIFETIME_MS = Duration.ofHours(1).toMillis();
+  private static final long TIMEOUT_MS = Duration.ofSeconds(5).toMillis();
 
+  private final Configuration configuration =
+      new Configuration.Builder().put(ConfigKey.SESSION_LIFETIME_MS, LIFETIME_MS).build();
   private final InfraIntegrationScope scope =
       new InfraIntegrationScope.Builder()
-          .setConfiguration(
-              new Configuration.Builder().put(ConfigKey.SESSION_LIFETIME_MS, LIFETIME_MS).build())
+          .setConfiguration(configuration)
+          .withTimeoutSessionConfiguration(TIMEOUT_MS)
           .build();
-  private final FakeClock fakeClock = scope.getFakeClock();
 
   @Test
   public void testGc_contentInLiveSessionRetained() {
@@ -93,7 +97,7 @@
             scope.getFeedSessionManager().getUpdateConsumer(MutationContext.EMPTY_CONTEXT));
 
     // Advance the clock without expiring the first session.
-    fakeClock.advance(LIFETIME_MS / 2);
+    scope.getFakeClock().advance(LIFETIME_MS / 2);
     InfraIntegrationScope secondScope = scope.clone();
     assertPayloads(REQUEST_1, secondScope, /* shouldExist= */ true);
     assertSharedStates(new ContentId[] {PIET_SHARED_STATE_1}, secondScope, /* shouldExist= */ true);
@@ -127,7 +131,7 @@
 
     // Advance the clock to expire the first session, create a new scope that will run
     // initialization and delete content from the expired session.
-    fakeClock.advance(LIFETIME_MS + 1L);
+    scope.getFakeClock().advance(LIFETIME_MS + 1L);
     InfraIntegrationScope secondScope = scope.clone();
     assertPayloads(REQUEST_1, secondScope, /* shouldExist= */ false);
     assertSharedStates(
@@ -136,6 +140,42 @@
     assertSharedStates(new ContentId[] {PIET_SHARED_STATE_2}, secondScope, /* shouldExist= */ true);
   }
 
+  @Test
+  public void testGc_contentBranchedMidInitializationRetained() {
+    InfraIntegrationScope scope =
+        new InfraIntegrationScope.Builder()
+            .setConfiguration(configuration)
+            .setSchedulerApi(
+                new FakeSchedulerApi(FakeThreadUtils.withoutThreadChecks())
+                    .setRequestBehavior(RequestBehavior.REQUEST_WITH_CONTENT))
+            .withQueuingTasks()
+            .withTimeoutSessionConfiguration(TIMEOUT_MS)
+            .build();
+
+    // Populate HEAD with REQUEST_1.
+    scope
+        .getFakeFeedRequestManager()
+        .queueResponse(createResponse(REQUEST_1, PIET_SHARED_STATE_1))
+        .triggerRefresh(
+            RequestReason.OPEN_WITHOUT_CONTENT,
+            scope.getFeedSessionManager().getUpdateConsumer(MutationContext.EMPTY_CONTEXT));
+    scope.getFakeDirectExecutor().runAllTasks();
+
+    // Make a new scope and enqueue a request to be sent on the next new session. GC should run
+    // after the new session is created and branched off a HEAD containing REQUEST_1.
+    InfraIntegrationScope secondScope = scope.clone();
+    secondScope
+        .getFakeFeedRequestManager()
+        .queueResponse(createResponse(REQUEST_2, PIET_SHARED_STATE_2));
+    secondScope
+        .getModelProviderFactory()
+        .createNew(/* viewDepthProvider= */ null, UiContext.getDefaultInstance());
+    secondScope.getFakeDirectExecutor().runAllTasks();
+    assertThat(secondScope.getFakeDirectExecutor().hasTasks()).isFalse();
+    assertPayloads(REQUEST_1, secondScope, /* shouldExist= */ true);
+    assertPayloads(REQUEST_2, secondScope, /* shouldExist= */ true);
+  }
+
   private static void assertPayloads(
       ContentId[] contentIds, InfraIntegrationScope scope, boolean shouldExist) {
     scope.getFakeThreadUtils().enforceMainThread(false);