Fix a race condition where content is incorrectly garbage collected.
1. A new session is created with strategy REQUEST_WITH_CONTENT.
2. Initialization hasn't completed, so the new session is not included in
SessionCache when GC is enqueued.
3. The request completes with a new HEAD, GC runs.
4. GC deletes content that was branched into the session in step #1.
PiperOrigin-RevId: 261332120
Change-Id: Idbbab6732f28315cfb2ed4f0cfc7a097d957d5ef
diff --git a/src/main/java/com/google/android/libraries/feed/api/host/config/Configuration.java b/src/main/java/com/google/android/libraries/feed/api/host/config/Configuration.java
index 5aefa7e..e82350a 100644
--- a/src/main/java/com/google/android/libraries/feed/api/host/config/Configuration.java
+++ b/src/main/java/com/google/android/libraries/feed/api/host/config/Configuration.java
@@ -191,6 +191,11 @@
return values.containsKey(key);
}
+ /** Returns a {@link Builder} for this {@link Configuration}. */
+ public Builder toBuilder() {
+ return new Builder(values);
+ }
+
/** Returns a default {@link Configuration}. */
public static Configuration getDefaultInstance() {
return new Builder().build();
@@ -198,7 +203,15 @@
/** Builder class used to create {@link Configuration} objects. */
public static final class Builder {
- private final HashMap<String, Object> values = new HashMap<>();
+ private final HashMap<String, Object> values;
+
+ private Builder(HashMap<String, Object> values) {
+ this.values = new HashMap<>(values);
+ }
+
+ public Builder() {
+ this(new HashMap<>());
+ }
public Builder put(@ConfigKey String key, String value) {
values.put(key, value);
diff --git a/src/main/java/com/google/android/libraries/feed/api/host/logging/Task.java b/src/main/java/com/google/android/libraries/feed/api/host/logging/Task.java
index 5158b68..026e054 100644
--- a/src/main/java/com/google/android/libraries/feed/api/host/logging/Task.java
+++ b/src/main/java/com/google/android/libraries/feed/api/host/logging/Task.java
@@ -56,7 +56,6 @@
Task.SESSION_MANAGER_TRIGGER_REFRESH,
Task.SESSION_MUTATION,
Task.TASK_QUEUE_INITIALIZE,
- Task.UPDATE_CONTENT_TRACKER,
Task.UPLOAD_ALL_ACTIONS_FOR_URL,
Task.NEXT_VALUE,
})
@@ -95,7 +94,6 @@
int SESSION_MANAGER_TRIGGER_REFRESH = 28;
int SESSION_MUTATION = 29;
int TASK_QUEUE_INITIALIZE = 30;
- int UPDATE_CONTENT_TRACKER = 31;
int UPLOAD_ALL_ACTIONS_FOR_URL = 32;
// The next value that should be used when adding additional values to the IntDef.
diff --git a/src/main/java/com/google/android/libraries/feed/common/concurrent/testing/FakeDirectExecutor.java b/src/main/java/com/google/android/libraries/feed/common/concurrent/testing/FakeDirectExecutor.java
index 92cc1ab..74659f9 100644
--- a/src/main/java/com/google/android/libraries/feed/common/concurrent/testing/FakeDirectExecutor.java
+++ b/src/main/java/com/google/android/libraries/feed/common/concurrent/testing/FakeDirectExecutor.java
@@ -14,23 +14,57 @@
package com.google.android.libraries.feed.common.concurrent.testing;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
/** Fake {@link Executor} that enforces a background thread. */
public final class FakeDirectExecutor implements Executor {
+ private final AtomicBoolean currentlyExecutingTasks = new AtomicBoolean();
private final FakeThreadUtils fakeThreadUtils;
+ private final List<Runnable> tasksToRun = new ArrayList<>();
+ private final boolean shouldQueueTasks;
- public FakeDirectExecutor(FakeThreadUtils fakeThreadUtils) {
+ public static FakeDirectExecutor runTasksImmediately(FakeThreadUtils fakeThreadUtils) {
+ return new FakeDirectExecutor(fakeThreadUtils, /* shouldQueueTasks= */ false);
+ }
+
+ public static FakeDirectExecutor queueAllTasks(FakeThreadUtils fakeThreadUtils) {
+ return new FakeDirectExecutor(fakeThreadUtils, /* shouldQueueTasks= */ true);
+ }
+
+ private FakeDirectExecutor(FakeThreadUtils fakeThreadUtils, boolean shouldQueueTasks) {
this.fakeThreadUtils = fakeThreadUtils;
+ this.shouldQueueTasks = shouldQueueTasks;
}
@Override
public void execute(Runnable command) {
+ tasksToRun.add(command);
+ if (!shouldQueueTasks) {
+ runAllTasks();
+ }
+ }
+
+ public void runAllTasks() {
+ if (currentlyExecutingTasks.getAndSet(true)) {
+ return;
+ }
+
boolean policy = fakeThreadUtils.enforceMainThread(false);
try {
- command.run();
+ while (!tasksToRun.isEmpty()) {
+ Runnable task = tasksToRun.remove(0);
+ task.run();
+ }
} finally {
fakeThreadUtils.enforceMainThread(policy);
+ currentlyExecutingTasks.set(false);
}
}
+
+ public boolean hasTasks() {
+ return !tasksToRun.isEmpty();
+ }
}
diff --git a/src/main/java/com/google/android/libraries/feed/common/concurrent/testing/FakeTaskQueue.java b/src/main/java/com/google/android/libraries/feed/common/concurrent/testing/FakeTaskQueue.java
index c2e4c0e..758cf68 100644
--- a/src/main/java/com/google/android/libraries/feed/common/concurrent/testing/FakeTaskQueue.java
+++ b/src/main/java/com/google/android/libraries/feed/common/concurrent/testing/FakeTaskQueue.java
@@ -27,7 +27,7 @@
public FakeTaskQueue(FakeClock fakeClock, FakeThreadUtils fakeThreadUtils) {
super(
new FakeBasicLoggingApi(),
- new FakeDirectExecutor(fakeThreadUtils),
+ FakeDirectExecutor.runTasksImmediately(fakeThreadUtils),
FakeMainThreadRunner.create(fakeClock),
fakeClock);
}
diff --git a/src/main/java/com/google/android/libraries/feed/common/testing/InfraIntegrationScope.java b/src/main/java/com/google/android/libraries/feed/common/testing/InfraIntegrationScope.java
index d39afed..749e022 100644
--- a/src/main/java/com/google/android/libraries/feed/common/testing/InfraIntegrationScope.java
+++ b/src/main/java/com/google/android/libraries/feed/common/testing/InfraIntegrationScope.java
@@ -51,7 +51,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
@@ -69,8 +68,8 @@
private final Configuration configuration;
private final ContentStorageDirect contentStorage;
- private final Executor executor;
private final FakeClock fakeClock;
+ private final FakeDirectExecutor fakeDirectExecutor;
private final FakeMainThreadRunner fakeMainThreadRunner;
private final FakeFeedRequestManager fakeFeedRequestManager;
private final FakeThreadUtils fakeThreadUtils;
@@ -86,7 +85,7 @@
private InfraIntegrationScope(
FakeThreadUtils fakeThreadUtils,
- Executor executor,
+ FakeDirectExecutor fakeDirectExecutor,
SchedulerApi schedulerApi,
FakeClock fakeClock,
Configuration configuration,
@@ -96,8 +95,8 @@
this.fakeClock = fakeClock;
this.configuration = configuration;
this.contentStorage = contentStorage;
- this.executor = executor;
this.journalStorage = journalStorage;
+ this.fakeDirectExecutor = fakeDirectExecutor;
this.fakeMainThreadRunner = fakeMainThreadRunner;
this.schedulerApi = schedulerApi;
this.fakeThreadUtils = fakeThreadUtils;
@@ -106,7 +105,8 @@
FakeBasicLoggingApi fakeBasicLoggingApi = new FakeBasicLoggingApi();
FeedExtensionRegistry extensionRegistry = new FeedExtensionRegistry(new ExtensionProvider());
- taskQueue = new TaskQueue(fakeBasicLoggingApi, executor, fakeMainThreadRunner, fakeClock);
+ taskQueue =
+ new TaskQueue(fakeBasicLoggingApi, fakeDirectExecutor, fakeMainThreadRunner, fakeClock);
store =
new FeedStore(
timingUtils,
@@ -173,6 +173,10 @@
return fakeClock;
}
+ public FakeDirectExecutor getFakeDirectExecutor() {
+ return fakeDirectExecutor;
+ }
+
public FakeMainThreadRunner getFakeMainThreadRunner() {
return fakeMainThreadRunner;
}
@@ -205,7 +209,7 @@
public InfraIntegrationScope clone() {
return new InfraIntegrationScope(
fakeThreadUtils,
- executor,
+ fakeDirectExecutor,
schedulerApi,
fakeClock,
configuration,
@@ -229,6 +233,8 @@
private final FakeThreadUtils fakeThreadUtils = FakeThreadUtils.withThreadChecks();
private Configuration configuration = Configuration.getDefaultInstance();
+ private FakeDirectExecutor fakeDirectExecutor =
+ FakeDirectExecutor.runTasksImmediately(fakeThreadUtils);
private SchedulerApi schedulerApi = new FakeSchedulerApi(fakeThreadUtils);
public Builder() {}
@@ -243,9 +249,14 @@
return this;
}
+ public Builder withQueuingTasks() {
+ fakeDirectExecutor = FakeDirectExecutor.queueAllTasks(fakeThreadUtils);
+ return this;
+ }
+
public Builder withTimeoutSessionConfiguration(long timeoutMs) {
- this.configuration =
- new Configuration.Builder()
+ configuration =
+ configuration.toBuilder()
.put(ConfigKey.USE_TIMEOUT_SCHEDULER, USE_TIMEOUT_SCHEDULER)
.put(ConfigKey.TIMEOUT_TIMEOUT_MS, timeoutMs)
.build();
@@ -255,7 +266,7 @@
public InfraIntegrationScope build() {
return new InfraIntegrationScope(
fakeThreadUtils,
- new FakeDirectExecutor(fakeThreadUtils),
+ fakeDirectExecutor,
schedulerApi,
fakeClock,
configuration,
diff --git a/src/main/java/com/google/android/libraries/feed/feedsessionmanager/internal/SessionCache.java b/src/main/java/com/google/android/libraries/feed/feedsessionmanager/internal/SessionCache.java
index 14c2e4c..c9c9a8a 100644
--- a/src/main/java/com/google/android/libraries/feed/feedsessionmanager/internal/SessionCache.java
+++ b/src/main/java/com/google/android/libraries/feed/feedsessionmanager/internal/SessionCache.java
@@ -416,7 +416,6 @@
HeadSessionImpl headSession = Validators.checkNotNull(head);
String headSessionId = headSession.getSessionId();
- List<SessionContentTracker> sessionContentTrackers = new ArrayList<>();
boolean cleanupSessions = false;
for (StreamSession session : sessionList) {
SessionMetadata metadata = getOrCreateSessionMetadata(session);
@@ -448,28 +447,6 @@
}
sessionsMetadata.put(session.getSessionId(), metadata);
}
- SessionContentTracker sessionContentTracker =
- new SessionContentTracker(/* supportsClearAll= */ false);
- // Unbound sessions are able to be created through restore so we only create content trackers
- // here and delay creation of unbound sessions until they are actually needed.
- sessionContentTrackers.add(sessionContentTracker);
-
- // Task which updates the newly created content tracker.
- Runnable updateContentTracker =
- () -> {
- Logger.i(TAG, "Task: updateContentTracker %s", sessionId);
- ElapsedTimeTracker timeTracker = timingUtils.getElapsedTimeTracker(TAG);
- Result<List<StreamStructure>> streamStructuresResult =
- store.getStreamStructures(sessionId);
- if (streamStructuresResult.isSuccessful()) {
- sessionContentTracker.update(streamStructuresResult.getValue());
- } else {
- Logger.e(TAG, "Failed to read unbound session state, ignored");
- }
- timeTracker.stop("task", "updateContentTracker");
- unboundSessionCount++;
- };
- taskQueue.execute(Task.UPDATE_CONTENT_TRACKER, TaskType.BACKGROUND, updateContentTracker);
}
if (cleanupSessions) {
@@ -484,9 +461,7 @@
Task.GARBAGE_COLLECT_CONTENT,
TaskType.BACKGROUND,
store.triggerContentGc(
- reservedContentIds,
- getAccessibleContentSupplier(sessionContentTrackers),
- shouldKeepSharedStates()));
+ reservedContentIds, getAccessibleContentSupplier(), shouldKeepSharedStates()));
}
private boolean shouldKeepSharedStates() {
@@ -503,15 +478,31 @@
return false;
}
- private Supplier<Set<String>> getAccessibleContentSupplier(
- List<SessionContentTracker> sessionContentTrackers) {
+ private Supplier<Set<String>> getAccessibleContentSupplier() {
return () -> {
- /*
- * We add head separately to make sure it is not GC-ed, just in case it is not present
- * in {@link #getPersistedSessions()}.
- */
+ threadUtils.checkNotMainThread();
+ Logger.i(TAG, "Determining accessible content");
+
+ // SessionIds should be determined at the time GC runs.
+ Set<String> sessionIds;
+ synchronized (lock) {
+ sessionIds = sessionsMetadata.keySet();
+ }
+
Set<String> accessibleContent = new HashSet<>(head.getContentInSession());
- for (SessionContentTracker sessionContentTracker : sessionContentTrackers) {
+ for (String sessionId : sessionIds) {
+ if (sessionId.equals(head.getSessionId())) {
+ continue;
+ }
+
+ SessionContentTracker sessionContentTracker =
+ new SessionContentTracker(/* supportsClearAll= */ false);
+ Result<List<StreamStructure>> streamStructuresResult = store.getStreamStructures(sessionId);
+ if (streamStructuresResult.isSuccessful()) {
+ sessionContentTracker.update(streamStructuresResult.getValue());
+ } else {
+ Logger.e(TAG, "Failed to read unbound session state, ignored");
+ }
accessibleContent.addAll(sessionContentTracker.getContentIds());
}
return accessibleContent;
diff --git a/src/test/java/com/google/android/libraries/feed/infraintegration/BUILD b/src/test/java/com/google/android/libraries/feed/infraintegration/BUILD
index 4992cba..ad9229d 100644
--- a/src/test/java/com/google/android/libraries/feed/infraintegration/BUILD
+++ b/src/test/java/com/google/android/libraries/feed/infraintegration/BUILD
@@ -199,9 +199,11 @@
"//src/main/java/com/google/android/libraries/feed/api/common",
"//src/main/java/com/google/android/libraries/feed/api/host/config",
"//src/main/java/com/google/android/libraries/feed/api/host/logging",
+ "//src/main/java/com/google/android/libraries/feed/api/host/scheduler",
"//src/main/java/com/google/android/libraries/feed/api/internal/common",
+ "//src/main/java/com/google/android/libraries/feed/common/concurrent/testing",
"//src/main/java/com/google/android/libraries/feed/common/testing",
- "//src/main/java/com/google/android/libraries/feed/common/time/testing",
+ "//src/main/java/com/google/android/libraries/feed/testing/host/scheduler",
"//src/main/proto/com/google/android/libraries/feed/api/internal/proto:client_feed_java_proto_lite",
"//src/main/proto/search/now/wire/feed:feed_java_proto_lite",
"//third_party:robolectric",
diff --git a/src/test/java/com/google/android/libraries/feed/infraintegration/GcTest.java b/src/test/java/com/google/android/libraries/feed/infraintegration/GcTest.java
index 3c02e51..7333649 100644
--- a/src/test/java/com/google/android/libraries/feed/infraintegration/GcTest.java
+++ b/src/test/java/com/google/android/libraries/feed/infraintegration/GcTest.java
@@ -20,10 +20,12 @@
import com.google.android.libraries.feed.api.host.config.Configuration;
import com.google.android.libraries.feed.api.host.config.Configuration.ConfigKey;
import com.google.android.libraries.feed.api.host.logging.RequestReason;
+import com.google.android.libraries.feed.api.host.scheduler.SchedulerApi.RequestBehavior;
import com.google.android.libraries.feed.api.internal.common.PayloadWithId;
+import com.google.android.libraries.feed.common.concurrent.testing.FakeThreadUtils;
import com.google.android.libraries.feed.common.testing.InfraIntegrationScope;
import com.google.android.libraries.feed.common.testing.ResponseBuilder;
-import com.google.android.libraries.feed.common.time.testing.FakeClock;
+import com.google.android.libraries.feed.testing.host.scheduler.FakeSchedulerApi;
import com.google.search.now.feed.client.StreamDataProto.StreamSharedState;
import com.google.search.now.feed.client.StreamDataProto.UiContext;
import com.google.search.now.wire.feed.ContentIdProto.ContentId;
@@ -60,13 +62,15 @@
ResponseBuilder.createFeatureContentId(3), ResponseBuilder.createFeatureContentId(4)
};
private static final long LIFETIME_MS = Duration.ofHours(1).toMillis();
+ private static final long TIMEOUT_MS = Duration.ofSeconds(5).toMillis();
+ private final Configuration configuration =
+ new Configuration.Builder().put(ConfigKey.SESSION_LIFETIME_MS, LIFETIME_MS).build();
private final InfraIntegrationScope scope =
new InfraIntegrationScope.Builder()
- .setConfiguration(
- new Configuration.Builder().put(ConfigKey.SESSION_LIFETIME_MS, LIFETIME_MS).build())
+ .setConfiguration(configuration)
+ .withTimeoutSessionConfiguration(TIMEOUT_MS)
.build();
- private final FakeClock fakeClock = scope.getFakeClock();
@Test
public void testGc_contentInLiveSessionRetained() {
@@ -93,7 +97,7 @@
scope.getFeedSessionManager().getUpdateConsumer(MutationContext.EMPTY_CONTEXT));
// Advance the clock without expiring the first session.
- fakeClock.advance(LIFETIME_MS / 2);
+ scope.getFakeClock().advance(LIFETIME_MS / 2);
InfraIntegrationScope secondScope = scope.clone();
assertPayloads(REQUEST_1, secondScope, /* shouldExist= */ true);
assertSharedStates(new ContentId[] {PIET_SHARED_STATE_1}, secondScope, /* shouldExist= */ true);
@@ -127,7 +131,7 @@
// Advance the clock to expire the first session, create a new scope that will run
// initialization and delete content from the expired session.
- fakeClock.advance(LIFETIME_MS + 1L);
+ scope.getFakeClock().advance(LIFETIME_MS + 1L);
InfraIntegrationScope secondScope = scope.clone();
assertPayloads(REQUEST_1, secondScope, /* shouldExist= */ false);
assertSharedStates(
@@ -136,6 +140,42 @@
assertSharedStates(new ContentId[] {PIET_SHARED_STATE_2}, secondScope, /* shouldExist= */ true);
}
+ @Test
+ public void testGc_contentBranchedMidInitializationRetained() {
+ InfraIntegrationScope scope =
+ new InfraIntegrationScope.Builder()
+ .setConfiguration(configuration)
+ .setSchedulerApi(
+ new FakeSchedulerApi(FakeThreadUtils.withoutThreadChecks())
+ .setRequestBehavior(RequestBehavior.REQUEST_WITH_CONTENT))
+ .withQueuingTasks()
+ .withTimeoutSessionConfiguration(TIMEOUT_MS)
+ .build();
+
+ // Populate HEAD with REQUEST_1.
+ scope
+ .getFakeFeedRequestManager()
+ .queueResponse(createResponse(REQUEST_1, PIET_SHARED_STATE_1))
+ .triggerRefresh(
+ RequestReason.OPEN_WITHOUT_CONTENT,
+ scope.getFeedSessionManager().getUpdateConsumer(MutationContext.EMPTY_CONTEXT));
+ scope.getFakeDirectExecutor().runAllTasks();
+
+ // Make a new scope and enqueue a request to be sent on the next new session. GC should run
+ // after the new session is created and branched off a HEAD containing REQUEST_1.
+ InfraIntegrationScope secondScope = scope.clone();
+ secondScope
+ .getFakeFeedRequestManager()
+ .queueResponse(createResponse(REQUEST_2, PIET_SHARED_STATE_2));
+ secondScope
+ .getModelProviderFactory()
+ .createNew(/* viewDepthProvider= */ null, UiContext.getDefaultInstance());
+ secondScope.getFakeDirectExecutor().runAllTasks();
+ assertThat(secondScope.getFakeDirectExecutor().hasTasks()).isFalse();
+ assertPayloads(REQUEST_1, secondScope, /* shouldExist= */ true);
+ assertPayloads(REQUEST_2, secondScope, /* shouldExist= */ true);
+ }
+
private static void assertPayloads(
ContentId[] contentIds, InfraIntegrationScope scope, boolean shouldExist) {
scope.getFakeThreadUtils().enforceMainThread(false);