blob: cc61d23ef5ab5da5155a2a62e32a6c18f8fd76d9 [file] [log] [blame]
// Copyright 2018 The Feed Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.android.libraries.feed.feedsessionmanager.internal;
import android.support.annotation.VisibleForTesting;
import android.text.TextUtils;
import com.google.android.libraries.feed.api.client.knowncontent.KnownContent;
import com.google.android.libraries.feed.api.common.MutationContext;
import com.google.android.libraries.feed.api.host.logging.Task;
import com.google.android.libraries.feed.api.host.scheduler.SchedulerApi;
import com.google.android.libraries.feed.api.host.storage.CommitResult;
import com.google.android.libraries.feed.api.internal.common.Model;
import com.google.android.libraries.feed.api.internal.common.ThreadUtils;
import com.google.android.libraries.feed.api.internal.modelprovider.ModelError;
import com.google.android.libraries.feed.api.internal.modelprovider.ModelError.ErrorType;
import com.google.android.libraries.feed.api.internal.modelprovider.ModelProvider;
import com.google.android.libraries.feed.api.internal.modelprovider.ModelProvider.State;
import com.google.android.libraries.feed.api.internal.store.ActionPropertiesMutation;
import com.google.android.libraries.feed.api.internal.store.ContentMutation;
import com.google.android.libraries.feed.api.internal.store.SemanticPropertiesMutation;
import com.google.android.libraries.feed.api.internal.store.Store;
import com.google.android.libraries.feed.common.Result;
import com.google.android.libraries.feed.common.concurrent.TaskQueue;
import com.google.android.libraries.feed.common.concurrent.TaskQueue.TaskType;
import com.google.android.libraries.feed.common.functional.Consumer;
import com.google.android.libraries.feed.common.logging.Dumpable;
import com.google.android.libraries.feed.common.logging.Dumper;
import com.google.android.libraries.feed.common.logging.Logger;
import com.google.android.libraries.feed.common.logging.StringFormattingUtils;
import com.google.android.libraries.feed.common.time.Clock;
import com.google.android.libraries.feed.common.time.TimingUtils;
import com.google.android.libraries.feed.common.time.TimingUtils.ElapsedTimeTracker;
import com.google.search.now.feed.client.StreamDataProto;
import com.google.search.now.feed.client.StreamDataProto.StreamDataOperation;
import com.google.search.now.feed.client.StreamDataProto.StreamPayload;
import com.google.search.now.feed.client.StreamDataProto.StreamStructure;
import com.google.search.now.feed.client.StreamDataProto.StreamStructure.Operation;
import com.google.search.now.feed.client.StreamDataProto.StreamToken;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* Class which implements factory to create the task which commits a mutation to the
* FeedSessionManager and Sessions. This is created and used by the {@link
* com.google.android.libraries.feed.api.internal.sessionmanager.FeedSessionManager} to commit a
* mutation defined as a List of {@link StreamDataOperation}.
*/
public final class SessionManagerMutation implements Dumpable {
private static final String TAG = "SessionManagerMutation";
private final Store store;
private final SessionCache sessionCache;
private final ContentCache contentCache;
private final TaskQueue taskQueue;
private final SchedulerApi schedulerApi;
private final ThreadUtils threadUtils;
private final TimingUtils timingUtils;
private final Clock clock;
// operation counts for the dumper
private int createCount = 0;
private int commitCount = 0;
private int errorCount = 0;
private int contentCommitErrorCount = 0;
private int semanticPropertiesCommitErrorCount = 0;
private int actionPropertiesCommitErrorCount = 0;
/** Listens for errors which need to be reported to a ModelProvider. */
public interface ModelErrorObserver {
void onError(/*@Nullable*/ Session session, ModelError error);
}
public SessionManagerMutation(
Store store,
SessionCache sessionCache,
ContentCache contentCache,
TaskQueue taskQueue,
SchedulerApi schedulerApi,
ThreadUtils threadUtils,
TimingUtils timingUtils,
Clock clock) {
this.store = store;
this.sessionCache = sessionCache;
this.contentCache = contentCache;
this.taskQueue = taskQueue;
this.schedulerApi = schedulerApi;
this.threadUtils = threadUtils;
this.timingUtils = timingUtils;
this.clock = clock;
}
/**
* Return a Consumer of StreamDataOperations which will update the {@link
* com.google.android.libraries.feed.api.internal.sessionmanager.FeedSessionManager}.
*/
public Consumer<Result<Model>> createCommitter(
String task,
MutationContext mutationContext,
ModelErrorObserver modelErrorObserver,
KnownContent./*@Nullable*/ Listener knownContentListener) {
createCount++;
return new MutationCommitter(task, mutationContext, modelErrorObserver, knownContentListener);
}
public void resetHead() {
HeadMutationCommitter mutation = new HeadMutationCommitter();
taskQueue.execute(
Task.INVALIDATE_HEAD, TaskType.HEAD_INVALIDATE, () -> mutation.resetHead(null));
}
public void forceResetHead() {
HeadMutationCommitter mutation = new HeadMutationCommitter();
mutation.resetHead(null);
}
@Override
public void dump(Dumper dumper) {
dumper.title(TAG);
dumper.forKey("mutationsCreated").value(createCount);
dumper.forKey("commitCount").value(commitCount).compactPrevious();
dumper.forKey("errorCount").value(errorCount).compactPrevious();
dumper.forKey("contentCommitErrorCount").value(contentCommitErrorCount).compactPrevious();
dumper
.forKey("semanticPropertiesCommitErrorCount")
.value(semanticPropertiesCommitErrorCount)
.compactPrevious();
dumper
.forKey("actionPropertiesCommitErrorCount")
.value(actionPropertiesCommitErrorCount)
.compactPrevious();
}
public static boolean validDataOperation(StreamDataOperation dataOperation) {
if (!dataOperation.hasStreamPayload() || !dataOperation.hasStreamStructure()) {
Logger.e(TAG, "Invalid StreamDataOperation - payload or streamStructure not defined");
return false;
}
String contentId = dataOperation.getStreamStructure().getContentId();
if (TextUtils.isEmpty(contentId)) {
Logger.e(TAG, "Invalid StreamDataOperation - No ID Found");
return false;
}
return true;
}
// TODO: Tiktok doesn't allow MutationCommitter to extend HeadMutationCommitter
class HeadMutationCommitter {
/** This runs as a task on the executor thread and also as part of a SessionMutation commit. */
@VisibleForTesting
void resetHead(/*@Nullable*/ String mutationSessionId) {
threadUtils.checkNotMainThread();
ElapsedTimeTracker timeTracker = timingUtils.getElapsedTimeTracker(TAG);
Collection<Session> attachedSessions = sessionCache.getAttachedSessions();
// If we have old sessions and we received a clear head, let's invalidate the session that
// initiated the clear.
store.clearHead();
for (Session session : attachedSessions) {
ModelProvider modelProvider = session.getModelProvider();
if (modelProvider != null
&& session.invalidateOnResetHead()
&& shouldInvalidateSession(mutationSessionId, modelProvider)) {
invalidateSession(modelProvider, session);
}
}
timeTracker.stop("task", "resetHead");
}
/**
* This will determine if the ModelProvider (session) should be invalidated.
*
* <ol>
* <li>ModelProvider should be READY
* <li>Clearing head was initiated externally, not from an existing session
* <li>Clearing head was initiated by the ModelProvider
* <li>The ModelProvider doesn't yet have a session, so we'll create the session from the new
* $HEAD
* </ol>
*/
@VisibleForTesting
boolean shouldInvalidateSession(
/*@Nullable*/ String mutationSessionId, ModelProvider modelProvider) {
// if the model provider isn't READY, don't invalidate the session
if (modelProvider.getCurrentState() != State.READY) {
return false;
}
// Clear was done outside of any session.
if (mutationSessionId == null) {
return true;
}
// Invalidate if the ModelProvider doesn't yet have a session or if it matches the session
// which initiated the request clearing $HEAD.
String sessionId = modelProvider.getSessionId();
return sessionId == null || sessionId.equals(mutationSessionId);
}
private void invalidateSession(ModelProvider modelProvider, Session session) {
threadUtils.checkNotMainThread();
Logger.i(TAG, "Invalidate session %s", session.getSessionId());
modelProvider.invalidate();
}
}
@VisibleForTesting
class MutationCommitter extends HeadMutationCommitter implements Consumer<Result<Model>> {
private final String task;
private final MutationContext mutationContext;
private final ModelErrorObserver modelErrorObserver;
private final KnownContent./*@Nullable*/ Listener knownContentListener;
private final List<StreamStructure> streamStructures = new ArrayList<>();
@VisibleForTesting boolean clearedHead = false;
private List<StreamDataOperation> dataOperations;
private MutationCommitter(
String task,
MutationContext mutationContext,
ModelErrorObserver modelErrorObserver,
KnownContent./*@Nullable*/ Listener knownContentListener) {
this.task = task;
this.mutationContext = mutationContext;
this.modelErrorObserver = modelErrorObserver;
this.knownContentListener = knownContentListener;
}
@Override
public void accept(Result<Model> updateResults) {
if (!updateResults.isSuccessful()) {
errorCount++;
Session session = null;
String sessionId = mutationContext.getRequestingSessionId();
if (sessionId != null) {
session = sessionCache.getAttached(sessionId);
}
if (mutationContext.getContinuationToken() != null) {
StreamToken streamToken = mutationContext.getContinuationToken();
if (session != null && streamToken != null) {
Logger.e(TAG, "Error found with a token request %s", streamToken.getContentId());
modelErrorObserver.onError(
session,
new ModelError(ErrorType.PAGINATION_ERROR, streamToken.getNextPageToken()));
} else {
Logger.e(TAG, "Unable to process PAGINATION_ERROR");
}
} else {
Logger.e(TAG, "Update error, the update is being ignored");
modelErrorObserver.onError(session, new ModelError(ErrorType.NO_CARDS_ERROR, null));
taskQueue.execute(Task.REQUEST_FAILURE, TaskType.HEAD_RESET, () -> {});
}
return;
}
dataOperations = updateResults.getValue().streamDataOperations;
for (StreamDataOperation operation : dataOperations) {
if (operation.getStreamStructure().getOperation() == Operation.CLEAR_ALL) {
clearedHead = true;
break;
}
}
int taskType;
if (mutationContext != null && mutationContext.isUserInitiated()) {
taskType = TaskType.IMMEDIATE;
} else {
taskType = clearedHead ? TaskType.HEAD_RESET : TaskType.USER_FACING;
}
taskQueue.execute(Task.COMMIT_TASK, taskType, this::commitTask);
}
private void commitTask() {
ElapsedTimeTracker timeTracker = timingUtils.getElapsedTimeTracker(TAG);
commitContent();
commitSessionUpdates();
commitCount++;
timeTracker.stop(
"task",
"sessionMutation.commitTask:" + task,
"mutations",
streamStructures.size(),
"userInitiated",
mutationContext != null ? mutationContext.isUserInitiated() : "No MutationContext");
}
private void commitContent() {
threadUtils.checkNotMainThread();
ElapsedTimeTracker timeTracker = timingUtils.getElapsedTimeTracker(TAG);
contentCache.startMutation();
ContentMutation contentMutation = store.editContent();
SemanticPropertiesMutation semanticPropertiesMutation = store.editSemanticProperties();
ActionPropertiesMutation actionPropertiesMutation = store.editActionProperties();
for (StreamDataOperation dataOperation : dataOperations) {
Operation operation = dataOperation.getStreamStructure().getOperation();
if (operation == Operation.CLEAR_ALL) {
streamStructures.add(dataOperation.getStreamStructure());
resetHead(mutationContext.getRequestingSessionId());
continue;
}
if (operation == Operation.UPDATE_OR_APPEND) {
if (!validDataOperation(dataOperation)) {
errorCount++;
continue;
}
String contentId = dataOperation.getStreamStructure().getContentId();
StreamPayload payload = dataOperation.getStreamPayload();
if (payload.hasStreamSharedState()) {
// don't add StreamSharedState to the metadata list stored for sessions
contentMutation.add(contentId, payload);
} else if (payload.hasStreamFeature() || payload.hasStreamToken()) {
contentCache.put(contentId, payload);
contentMutation.add(contentId, payload);
streamStructures.add(dataOperation.getStreamStructure());
} else if (dataOperation.getStreamPayload().hasSemanticData()) {
semanticPropertiesMutation.add(
contentId, dataOperation.getStreamPayload().getSemanticData());
} else if (payload.hasActionData()) {
actionPropertiesMutation.add(contentId, payload.getActionData());
} else {
Logger.e(TAG, "Unsupported UPDATE_OR_APPEND payload");
}
continue;
}
if (operation == Operation.REMOVE) {
// We don't update the content for REMOVED items, content will be garbage collected.
streamStructures.add(dataOperation.getStreamStructure());
continue;
}
if (operation == Operation.REQUIRED_CONTENT) {
streamStructures.add(dataOperation.getStreamStructure());
continue;
}
errorCount++;
Logger.e(
TAG, "Unsupported Mutation: %s", dataOperation.getStreamStructure().getOperation());
}
taskQueue.execute(
Task.PERSIST_MUTATION,
TaskType.BACKGROUND,
() -> {
if (contentMutation.commit().getResult() == CommitResult.Result.FAILURE) {
contentCommitErrorCount++;
Logger.e(TAG, "contentMutation failed");
}
if (semanticPropertiesMutation.commit().getResult() == CommitResult.Result.FAILURE) {
semanticPropertiesCommitErrorCount++;
Logger.e(TAG, "semanticPropertiesMutation failed");
}
if (actionPropertiesMutation.commit().getResult() == CommitResult.Result.FAILURE) {
actionPropertiesCommitErrorCount++;
Logger.e(TAG, "actionPropertiesMutation failed");
}
contentCache.finishMutation();
});
timeTracker.stop("", "contentUpdate", "items", dataOperations.size());
}
private void commitSessionUpdates() {
threadUtils.checkNotMainThread();
ElapsedTimeTracker timeTracker = timingUtils.getElapsedTimeTracker(TAG);
StreamDataProto.StreamToken mutationSourceToken =
(mutationContext != null) ? mutationContext.getContinuationToken() : null;
// For sessions we want to add the remove operation if the mutation source was a
// continuation token.
if (mutationSourceToken != null) {
StreamStructure removeOperation = addTokenRemoveOperation(mutationSourceToken);
if (removeOperation != null) {
streamStructures.add(0, removeOperation);
}
}
Collection<Session> updates = sessionCache.getAllSessions();
HeadSessionImpl head = sessionCache.getHead();
for (Session session : updates) {
ModelProvider modelProvider = session.getModelProvider();
if (modelProvider != null && modelProvider.getCurrentState() == State.INVALIDATED) {
Logger.w(TAG, "Removing an invalidate session");
// Remove all invalidated sessions
sessionCache.removeAttached(session.getSessionId());
continue;
}
if (session == head) {
long updateTime = clock.currentTimeMillis();
if (clearedHead) {
session.updateSession(clearedHead, streamStructures, mutationContext);
sessionCache.updateHeadLastAddedTimeMillis(updateTime);
schedulerApi.onReceiveNewContent(updateTime);
if (knownContentListener != null) {
knownContentListener.onNewContentReceived(/* isNewRefresh */ true, updateTime);
}
Logger.i(
TAG,
"Cleared Head, new creation time %s",
StringFormattingUtils.formatLogDate(updateTime));
continue;
} else if (knownContentListener != null) {
knownContentListener.onNewContentReceived(/* isNewRefresh */ false, updateTime);
}
}
Logger.i(TAG, "Update Session %s", session.getSessionId());
session.updateSession(clearedHead, streamStructures, mutationContext);
}
timeTracker.stop("", "sessionUpdate", "sessions", updates.size());
}
/*@Nullable*/
private StreamStructure addTokenRemoveOperation(StreamToken token) {
return StreamStructure.newBuilder()
.setContentId(token.getContentId())
.setParentContentId(token.getParentId())
.setOperation(Operation.REMOVE)
.build();
}
}
}