| // Copyright 2018 The Feed Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package com.google.android.libraries.feed.feedstore.internal; |
| |
| import static com.google.android.libraries.feed.feedstore.internal.FeedStoreConstants.ACTION_PROPERTIES_PREFIX; |
| import static com.google.android.libraries.feed.feedstore.internal.FeedStoreConstants.DISMISS_ACTION_JOURNAL; |
| import static com.google.android.libraries.feed.feedstore.internal.FeedStoreConstants.SEMANTIC_PROPERTIES_PREFIX; |
| import static com.google.android.libraries.feed.feedstore.internal.FeedStoreConstants.SHARED_STATE_PREFIX; |
| import static com.google.android.libraries.feed.feedstore.internal.FeedStoreConstants.UPLOADABLE_ACTION_PREFIX; |
| |
| import android.util.Base64; |
| import com.google.android.libraries.feed.api.host.storage.CommitResult; |
| import com.google.android.libraries.feed.api.host.storage.ContentMutation.Builder; |
| import com.google.android.libraries.feed.api.host.storage.ContentStorage; |
| import com.google.android.libraries.feed.api.host.storage.ContentStorageDirect; |
| import com.google.android.libraries.feed.api.host.storage.JournalMutation; |
| import com.google.android.libraries.feed.api.host.storage.JournalStorage; |
| import com.google.android.libraries.feed.api.host.storage.JournalStorageDirect; |
| import com.google.android.libraries.feed.api.internal.common.ActionPropertiesWithId; |
| import com.google.android.libraries.feed.api.internal.common.PayloadWithId; |
| import com.google.android.libraries.feed.api.internal.common.SemanticPropertiesWithId; |
| import com.google.android.libraries.feed.api.internal.common.ThreadUtils; |
| import com.google.android.libraries.feed.api.internal.store.ActionPropertiesMutation; |
| import com.google.android.libraries.feed.api.internal.store.ContentMutation; |
| import com.google.android.libraries.feed.api.internal.store.LocalActionMutation; |
| import com.google.android.libraries.feed.api.internal.store.LocalActionMutation.ActionType; |
| import com.google.android.libraries.feed.api.internal.store.SemanticPropertiesMutation; |
| import com.google.android.libraries.feed.api.internal.store.SessionMutation; |
| import com.google.android.libraries.feed.api.internal.store.StoreListener; |
| import com.google.android.libraries.feed.api.internal.store.UploadableActionMutation; |
| import com.google.android.libraries.feed.common.Result; |
| import com.google.android.libraries.feed.common.functional.Supplier; |
| import com.google.android.libraries.feed.common.intern.Interner; |
| import com.google.android.libraries.feed.common.intern.InternerWithStats; |
| import com.google.android.libraries.feed.common.intern.WeakPoolInterner; |
| import com.google.android.libraries.feed.common.logging.Dumpable; |
| import com.google.android.libraries.feed.common.logging.Dumper; |
| import com.google.android.libraries.feed.common.logging.Logger; |
| import com.google.android.libraries.feed.common.protoextensions.FeedExtensionRegistry; |
| import com.google.android.libraries.feed.common.time.Clock; |
| import com.google.android.libraries.feed.common.time.TimingUtils; |
| import com.google.android.libraries.feed.common.time.TimingUtils.ElapsedTimeTracker; |
| import com.google.protobuf.ByteString; |
| import com.google.protobuf.InvalidProtocolBufferException; |
| import com.google.search.now.feed.client.StreamDataProto.StreamLocalAction; |
| import com.google.search.now.feed.client.StreamDataProto.StreamPayload; |
| import com.google.search.now.feed.client.StreamDataProto.StreamSharedState; |
| import com.google.search.now.feed.client.StreamDataProto.StreamStructure; |
| import com.google.search.now.feed.client.StreamDataProto.StreamUploadableAction; |
| import com.google.search.now.wire.feed.OpaqueActionDataProto.OpaqueActionData; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| |
| /** |
| * Implementation of the Store. The PersistentFeedStore will call the host APIs {@link |
| * ContentStorage} and {@link JournalStorage} to make persistent changes. |
| */ |
| public final class PersistentFeedStore implements ClearableStore, Dumpable { |
| |
| private static final String TAG = "PersistentFeedStore"; |
| |
| private final TimingUtils timingUtils; |
| private final FeedExtensionRegistry extensionRegistry; |
| private final ContentStorageDirect contentStorageDirect; |
| private final JournalStorageDirect journalStorageDirect; |
| private final ThreadUtils threadUtils; |
| private final Clock clock; |
| private final FeedStoreHelper storeHelper; |
| |
| // We use a common string interner pool because the same content IDs are reused across different |
| // protos. The actual proto interners below are backed by thisl common pool. |
| private final InternerWithStats<String> contentIdStringInterner = |
| new InternerWithStats<>(new WeakPoolInterner<>()); |
| private final Interner<StreamStructure> streamStructureInterner = |
| new StreamStructureInterner(contentIdStringInterner); |
| private final Interner<StreamPayload> streamPayloadInterner = |
| new StreamPayloadInterner(contentIdStringInterner); |
| |
| public PersistentFeedStore( |
| TimingUtils timingUtils, |
| FeedExtensionRegistry extensionRegistry, |
| ContentStorageDirect contentStorageDirect, |
| JournalStorageDirect journalStorageDirect, |
| ThreadUtils threadUtils, |
| Clock clock, |
| FeedStoreHelper storeHelper) { |
| this.timingUtils = timingUtils; |
| this.extensionRegistry = extensionRegistry; |
| this.contentStorageDirect = contentStorageDirect; |
| this.journalStorageDirect = journalStorageDirect; |
| this.threadUtils = threadUtils; |
| this.clock = clock; |
| this.storeHelper = storeHelper; |
| } |
| |
| @Override |
| public Result<List<PayloadWithId>> getPayloads(List<String> contentIds) { |
| threadUtils.checkNotMainThread(); |
| ElapsedTimeTracker tracker = timingUtils.getElapsedTimeTracker(TAG); |
| List<PayloadWithId> payloads = new ArrayList<>(contentIds.size()); |
| Result<Map<String, byte[]>> contentResult = contentStorageDirect.get(contentIds); |
| if (!contentResult.isSuccessful()) { |
| Logger.e(TAG, "Unsuccessful fetching payloads for content ids %s", contentIds); |
| tracker.stop("getPayloads failed", "items", contentIds); |
| return Result.failure(); |
| } |
| |
| // TODO: This code should be asserting that all requested contentIds have been |
| // provided, otherwise an unbound child could be exposed to the UI. |
| for (Map.Entry<String, byte[]> entry : contentResult.getValue().entrySet()) { |
| try { |
| StreamPayload streamPayload = |
| streamPayloadInterner.intern( |
| StreamPayload.parseFrom( |
| entry.getValue(), extensionRegistry.getExtensionRegistry())); |
| payloads.add(new PayloadWithId(entry.getKey(), streamPayload)); |
| } catch (InvalidProtocolBufferException e) { |
| Logger.e(TAG, "Couldn't parse content proto for id %s", entry.getKey()); |
| } |
| } |
| tracker.stop("", "getPayloads", "items", contentIds.size()); |
| return Result.success(payloads); |
| } |
| |
| @Override |
| public Result<List<StreamSharedState>> getSharedStates() { |
| threadUtils.checkNotMainThread(); |
| ElapsedTimeTracker tracker = timingUtils.getElapsedTimeTracker(TAG); |
| Result<Map<String, byte[]>> bytesResult = contentStorageDirect.getAll(SHARED_STATE_PREFIX); |
| if (!bytesResult.isSuccessful()) { |
| Logger.e(TAG, "Error fetching shared states"); |
| tracker.stop("getSharedStates", "failed"); |
| return Result.failure(); |
| } |
| Collection<byte[]> result = bytesResult.getValue().values(); |
| List<StreamSharedState> sharedStates = new ArrayList<>(result.size()); |
| for (byte[] byteArray : result) { |
| try { |
| sharedStates.add(StreamSharedState.parseFrom(byteArray)); |
| } catch (InvalidProtocolBufferException e) { |
| tracker.stop("getSharedStates", "failed"); |
| Logger.e(TAG, e, "Error parsing protocol buffer from bytes %s", byteArray); |
| } |
| } |
| tracker.stop("", "getSharedStates", "items", sharedStates.size()); |
| return Result.success(sharedStates); |
| } |
| |
| @Override |
| public Result<List<StreamStructure>> getStreamStructures(String sessionId) { |
| threadUtils.checkNotMainThread(); |
| ElapsedTimeTracker tracker = timingUtils.getElapsedTimeTracker(TAG); |
| List<StreamStructure> streamStructures; |
| Result<List<byte[]>> operationsResult = journalStorageDirect.read(sessionId); |
| if (!operationsResult.isSuccessful()) { |
| Logger.e(TAG, "Error fetching stream structures for session %s", sessionId); |
| tracker.stop("getStreamStructures failed", "session", sessionId); |
| return Result.failure(); |
| } |
| List<byte[]> results = operationsResult.getValue(); |
| streamStructures = new ArrayList<>(results.size()); |
| for (byte[] bytes : results) { |
| if (bytes.length == 0) { |
| continue; |
| } |
| try { |
| streamStructures.add(streamStructureInterner.intern(StreamStructure.parseFrom(bytes))); |
| } catch (InvalidProtocolBufferException e) { |
| Logger.e(TAG, e, "Error parsing stream structure."); |
| } |
| } |
| tracker.stop("", "getStreamStructures", "items", streamStructures.size()); |
| return Result.success(streamStructures); |
| } |
| |
| @Override |
| public Result<List<String>> getAllSessions() { |
| threadUtils.checkNotMainThread(); |
| ElapsedTimeTracker tracker = timingUtils.getElapsedTimeTracker(TAG); |
| List<String> sessions; |
| |
| Result<List<String>> namesResult = journalStorageDirect.getAllJournals(); |
| if (!namesResult.isSuccessful()) { |
| Logger.e(TAG, "Error fetching all journals"); |
| tracker.stop("getAllSessions failed"); |
| return Result.failure(); |
| } |
| |
| List<String> result = namesResult.getValue(); |
| sessions = new ArrayList<>(result.size()); |
| for (String name : result) { |
| if (HEAD_SESSION_ID.equals(name)) { |
| // Don't add $HEAD to the sessions list |
| continue; |
| } |
| if (DISMISS_ACTION_JOURNAL.equals(name)) { |
| // Don't add Dismiss Actions journal to sessions list |
| continue; |
| } |
| sessions.add(name); |
| } |
| tracker.stop("", "getAllSessions", "items", sessions.size()); |
| return Result.success(sessions); |
| } |
| |
| @Override |
| public Result<List<SemanticPropertiesWithId>> getSemanticProperties(List<String> contentIds) { |
| threadUtils.checkNotMainThread(); |
| ElapsedTimeTracker tracker = timingUtils.getElapsedTimeTracker(TAG); |
| List<SemanticPropertiesWithId> semanticPropertiesWithIds = new ArrayList<>(contentIds.size()); |
| List<String> contentIdKeys = new ArrayList<>(contentIds.size()); |
| for (String contentId : contentIds) { |
| contentIdKeys.add(SEMANTIC_PROPERTIES_PREFIX + contentId); |
| } |
| Result<Map<String, byte[]>> mapResult = contentStorageDirect.get(contentIdKeys); |
| |
| if (mapResult.isSuccessful()) { |
| for (Map.Entry<String, byte[]> entry : mapResult.getValue().entrySet()) { |
| String contentId = entry.getKey().replace(SEMANTIC_PROPERTIES_PREFIX, ""); |
| if (contentIds.contains(contentId)) { |
| semanticPropertiesWithIds.add(new SemanticPropertiesWithId(contentId, entry.getValue())); |
| } |
| } |
| } else { |
| Logger.e(TAG, "Error fetching semantic properties for content ids %s", contentIds); |
| tracker.stop("getSemanticProperties failed", contentIds); |
| } |
| |
| tracker.stop("task", "getSemanticProperties", "size", semanticPropertiesWithIds.size()); |
| return Result.success(semanticPropertiesWithIds); |
| } |
| |
| @Override |
| public Result<List<ActionPropertiesWithId>> getActionProperties(List<String> contentIds) { |
| threadUtils.checkNotMainThread(); |
| ElapsedTimeTracker tracker = timingUtils.getElapsedTimeTracker(TAG); |
| List<ActionPropertiesWithId> actionPropertiesWithIds = new ArrayList<>(contentIds.size()); |
| List<String> contentIdKeys = new ArrayList<>(contentIds.size()); |
| for (String contentId : contentIds) { |
| contentIdKeys.add(ACTION_PROPERTIES_PREFIX + contentId); |
| } |
| Result<Map<String, byte[]>> mapResult = contentStorageDirect.get(contentIdKeys); |
| |
| if (mapResult.isSuccessful()) { |
| for (Map.Entry<String, byte[]> entry : mapResult.getValue().entrySet()) { |
| String contentId = entry.getKey().replace(ACTION_PROPERTIES_PREFIX, ""); |
| if (contentIds.contains(contentId)) { |
| try { |
| actionPropertiesWithIds.add( |
| new ActionPropertiesWithId( |
| contentId, OpaqueActionData.parseFrom(entry.getValue()))); |
| } catch (InvalidProtocolBufferException e) { |
| Logger.e( |
| TAG, |
| e, |
| "Error parsing OpaqueActionData for bytes: %s", |
| Base64.encodeToString(entry.getValue(), Base64.DEFAULT)); |
| } |
| } |
| } |
| } else { |
| Logger.e(TAG, "Error fetching action properties for content ids %s", contentIds); |
| tracker.stop("getActionProperties failed", contentIds); |
| } |
| |
| tracker.stop("task", "getActionProperties", "size", actionPropertiesWithIds.size()); |
| return Result.success(actionPropertiesWithIds); |
| } |
| |
| @Override |
| public Result<List<StreamLocalAction>> getAllDismissLocalActions() { |
| threadUtils.checkNotMainThread(); |
| ElapsedTimeTracker tracker = timingUtils.getElapsedTimeTracker(TAG); |
| |
| Result<List<byte[]>> listResult = journalStorageDirect.read(DISMISS_ACTION_JOURNAL); |
| if (!listResult.isSuccessful()) { |
| Logger.e(TAG, "Error retrieving dismiss journal"); |
| tracker.stop("getAllDismissLocalActions failed"); |
| return Result.failure(); |
| } else { |
| List<byte[]> actionByteArrays = listResult.getValue(); |
| List<StreamLocalAction> actions = new ArrayList<>(actionByteArrays.size()); |
| for (byte[] bytes : actionByteArrays) { |
| StreamLocalAction action; |
| try { |
| action = StreamLocalAction.parseFrom(bytes); |
| actions.add(action); |
| } catch (InvalidProtocolBufferException e) { |
| Logger.e( |
| TAG, |
| e, |
| "Error parsing StreamLocalAction for bytes: %s (length %d)", |
| Base64.encodeToString(bytes, Base64.DEFAULT), |
| bytes.length); |
| } |
| } |
| tracker.stop("task", "getAllDismissLocalActions", "size", actions.size()); |
| return Result.success(actions); |
| } |
| } |
| |
| @Override |
| public Result<Set<StreamUploadableAction>> getAllUploadableActions() { |
| threadUtils.checkNotMainThread(); |
| ElapsedTimeTracker tracker = timingUtils.getElapsedTimeTracker(TAG); |
| Result<Map<String, byte[]>> bytesResult = contentStorageDirect.getAll(UPLOADABLE_ACTION_PREFIX); |
| if (!bytesResult.isSuccessful()) { |
| Logger.e(TAG, "Error fetching shared states"); |
| tracker.stop("getAllUploadableActions", "failed"); |
| return Result.failure(); |
| } |
| Collection<byte[]> result = bytesResult.getValue().values(); |
| Set<StreamUploadableAction> uploadableActions = new HashSet<>(); |
| for (byte[] byteArray : result) { |
| try { |
| uploadableActions.add(StreamUploadableAction.parseFrom(byteArray)); |
| } catch (InvalidProtocolBufferException e) { |
| tracker.stop("getAllUploadableActions", "failed"); |
| Logger.e(TAG, e, "Error parsing protocol buffer from bytes %s", byteArray); |
| return Result.failure(); |
| } |
| } |
| tracker.stop("", "getAllUploadableActions", "items", uploadableActions.size()); |
| return Result.success(uploadableActions); |
| } |
| |
| @Override |
| public Result<String> createNewSession() { |
| threadUtils.checkNotMainThread(); |
| |
| ElapsedTimeTracker tracker = timingUtils.getElapsedTimeTracker(TAG); |
| String sessionId = storeHelper.getNewStreamSessionId(); |
| journalStorageDirect.commit( |
| new JournalMutation.Builder(HEAD_SESSION_ID).copy(sessionId).build()); |
| tracker.stop("createNewSession", sessionId); |
| return Result.success(sessionId); |
| } |
| |
| @Override |
| public void removeSession(String sessionId) { |
| threadUtils.checkNotMainThread(); |
| ElapsedTimeTracker tracker = timingUtils.getElapsedTimeTracker(TAG); |
| if (sessionId.equals(HEAD_SESSION_ID)) { |
| throw new IllegalStateException("Unable to delete the $HEAD session"); |
| } |
| journalStorageDirect.commit(new JournalMutation.Builder(sessionId).delete().build()); |
| tracker.stop("removeSession", sessionId); |
| } |
| |
| @Override |
| public void clearHead() { |
| threadUtils.checkNotMainThread(); |
| ElapsedTimeTracker tracker = timingUtils.getElapsedTimeTracker(TAG); |
| journalStorageDirect.commit( |
| new JournalMutation.Builder(HEAD_SESSION_ID).delete().append(new byte[0]).build()); |
| tracker.stop("", "clearHead"); |
| } |
| |
| @Override |
| public ContentMutation editContent() { |
| threadUtils.checkNotMainThread(); |
| return new FeedContentMutation(this::commitContentMutation); |
| } |
| |
| @Override |
| public SessionMutation editSession(String sessionId) { |
| threadUtils.checkNotMainThread(); |
| return new FeedSessionMutation( |
| feedSessionMutation -> commitSessionMutation(sessionId, feedSessionMutation)); |
| } |
| |
| @Override |
| public SemanticPropertiesMutation editSemanticProperties() { |
| threadUtils.checkNotMainThread(); |
| return new FeedSemanticPropertiesMutation(this::commitSemanticPropertiesMutation); |
| } |
| |
| @Override |
| public ActionPropertiesMutation editActionProperties() { |
| threadUtils.checkNotMainThread(); |
| return new FeedActionPropertiesMutation(this::commitActionPropertiesMutation); |
| } |
| |
| @Override |
| public LocalActionMutation editLocalActions() { |
| threadUtils.checkNotMainThread(); |
| return new FeedLocalActionMutation(this::commitLocalActionMutation); |
| } |
| |
| @Override |
| public UploadableActionMutation editUploadableActions() { |
| return new FeedUploadableActionMutation(this::commitUploadableActionMutation); |
| } |
| |
| @Override |
| public Runnable triggerContentGc( |
| Set<String> reservedContentIds, |
| Supplier<Set<String>> accessibleContent, |
| boolean keepSharedStates) { |
| Supplier<Set<StreamLocalAction>> dismissActionSupplier = |
| () -> { |
| Result<List<StreamLocalAction>> dismissActionsResult = getAllDismissLocalActions(); |
| |
| if (!dismissActionsResult.isSuccessful()) { |
| // TODO: clean up error condition |
| Logger.e(TAG, "Error retrieving dismiss actions for content garbage collection"); |
| return Collections.emptySet(); |
| } else { |
| return new HashSet<>(dismissActionsResult.getValue()); |
| } |
| }; |
| Supplier<Set<StreamUploadableAction>> uploadableActionSupplier = |
| () -> { |
| Result<Set<StreamUploadableAction>> uploadableActionsResult = getAllUploadableActions(); |
| |
| if (!uploadableActionsResult.isSuccessful()) { |
| // TODO: clean up error condition |
| Logger.e(TAG, "Error retrieving uploadable actions for content garbage collection"); |
| return Collections.emptySet(); |
| } else { |
| return uploadableActionsResult.getValue(); |
| } |
| }; |
| |
| return new ContentGc( |
| accessibleContent, |
| reservedContentIds, |
| dismissActionSupplier, |
| uploadableActionSupplier, |
| contentStorageDirect, |
| timingUtils, |
| keepSharedStates) |
| ::gc; |
| } |
| |
| @Override |
| public Runnable triggerLocalActionGc( |
| List<StreamLocalAction> actions, List<String> validContentIds) { |
| return new LocalActionGc( |
| actions, validContentIds, journalStorageDirect, timingUtils, DISMISS_ACTION_JOURNAL) |
| ::gc; |
| } |
| |
| @Override |
| public void switchToEphemeralMode() { |
| // TODO: implement cleanup |
| } |
| |
| @Override |
| public boolean isEphemeralMode() { |
| return false; |
| } |
| |
| @Override |
| public void registerObserver(StoreListener observer) { |
| throw new UnsupportedOperationException( |
| "PersistentFeedStore does not support observer directly"); |
| } |
| |
| @Override |
| public void unregisterObserver(StoreListener observer) { |
| throw new UnsupportedOperationException( |
| "PersistentFeedStore does not support observer directly"); |
| } |
| |
| private CommitResult commitSemanticPropertiesMutation( |
| Map<String, ByteString> semanticPropertiesMap) { |
| threadUtils.checkNotMainThread(); |
| ElapsedTimeTracker tracker = timingUtils.getElapsedTimeTracker(TAG); |
| Builder mutationBuilder = new Builder(); |
| for (Map.Entry<String, ByteString> entry : semanticPropertiesMap.entrySet()) { |
| mutationBuilder.upsert( |
| SEMANTIC_PROPERTIES_PREFIX + entry.getKey(), entry.getValue().toByteArray()); |
| } |
| CommitResult commitResult = contentStorageDirect.commit(mutationBuilder.build()); |
| tracker.stop( |
| "task", "commitSemanticPropertiesMutation", "mutations", semanticPropertiesMap.size()); |
| return commitResult; |
| } |
| |
| private CommitResult commitActionPropertiesMutation( |
| Map<String, OpaqueActionData> actionPropertiesMap) { |
| threadUtils.checkNotMainThread(); |
| ElapsedTimeTracker tracker = timingUtils.getElapsedTimeTracker(TAG); |
| Builder mutationBuilder = new Builder(); |
| for (Map.Entry<String, OpaqueActionData> entry : actionPropertiesMap.entrySet()) { |
| mutationBuilder.upsert( |
| ACTION_PROPERTIES_PREFIX + entry.getKey(), entry.getValue().toByteArray()); |
| } |
| CommitResult commitResult = contentStorageDirect.commit(mutationBuilder.build()); |
| tracker.stop("task", "commitActionPropertiesMutation", "mutations", actionPropertiesMap.size()); |
| return commitResult; |
| } |
| |
| private Boolean commitSessionMutation(String sessionId, List<StreamStructure> streamStructures) { |
| threadUtils.checkNotMainThread(); |
| ElapsedTimeTracker tracker = timingUtils.getElapsedTimeTracker(TAG); |
| JournalMutation.Builder mutation = new JournalMutation.Builder(sessionId); |
| if (streamStructures.isEmpty()) { |
| // allow an empty journal to be created |
| mutation.append(new byte[0]); |
| } |
| for (StreamStructure streamStructure : streamStructures) { |
| mutation.append(streamStructure.toByteArray()); |
| } |
| CommitResult mutationResult = journalStorageDirect.commit(mutation.build()); |
| boolean result = CommitResult.SUCCESS.equals(mutationResult); |
| tracker.stop("", "commitSessionMutation", "mutations", streamStructures.size()); |
| Logger.i( |
| TAG, |
| "commitSessionMutation - Success %s, Update Session %s, stream structures %s", |
| result, |
| sessionId, |
| streamStructures.size()); |
| return result; |
| } |
| |
| private CommitResult commitContentMutation(List<PayloadWithId> mutations) { |
| ElapsedTimeTracker tracker = timingUtils.getElapsedTimeTracker(TAG); |
| |
| Builder contentMutationBuilder = new Builder(); |
| for (PayloadWithId mutation : mutations) { |
| String payloadId = mutation.contentId; |
| StreamPayload payload = mutation.payload; |
| if (mutation.payload.hasStreamSharedState()) { |
| StreamSharedState streamSharedState = mutation.payload.getStreamSharedState(); |
| contentMutationBuilder.upsert( |
| SHARED_STATE_PREFIX + streamSharedState.getContentId(), |
| streamSharedState.toByteArray()); |
| } else { |
| contentMutationBuilder.upsert(payloadId, payload.toByteArray()); |
| } |
| } |
| |
| // Block waiting for the response from storage, to make this method synchronous. |
| // TODO: handle errors |
| CommitResult commitResult = contentStorageDirect.commit(contentMutationBuilder.build()); |
| tracker.stop("task", "commitContentMutation", "mutations", mutations.size()); |
| return commitResult; |
| } |
| |
| private CommitResult commitLocalActionMutation(Map<Integer, List<String>> actions) { |
| ElapsedTimeTracker tracker = timingUtils.getElapsedTimeTracker(TAG); |
| |
| CommitResult commitResult = CommitResult.SUCCESS; |
| for (Map.Entry<Integer, List<String>> entry : actions.entrySet()) { |
| Integer actionType = entry.getKey(); |
| String journalName; |
| if (ActionType.DISMISS == entry.getKey()) { |
| journalName = DISMISS_ACTION_JOURNAL; |
| } else { |
| Logger.e(TAG, "Unknown journal name for action type %s", actionType); |
| continue; |
| } |
| JournalMutation.Builder builder = new JournalMutation.Builder(journalName); |
| for (String contentId : entry.getValue()) { |
| StreamLocalAction action = |
| StreamLocalAction.newBuilder() |
| .setAction(actionType) |
| .setFeatureContentId(contentId) |
| .setTimestampSeconds(TimeUnit.MILLISECONDS.toSeconds(clock.currentTimeMillis())) |
| .build(); |
| byte[] actionBytes = action.toByteArray(); |
| Logger.i( |
| TAG, |
| "Adding StreamLocalAction bytes %s (length %d) to journal %s", |
| Base64.encodeToString(actionBytes, Base64.DEFAULT), |
| actionBytes.length, |
| journalName); |
| builder.append(actionBytes); |
| } |
| commitResult = journalStorageDirect.commit(builder.build()); |
| if (commitResult == CommitResult.FAILURE) { |
| Logger.e(TAG, "Error committing action for type %s", actionType); |
| break; |
| } |
| } |
| |
| tracker.stop("task", "commitLocalActionMutation", "actions", actions.size()); |
| return commitResult; |
| } |
| |
| private CommitResult commitUploadableActionMutation( |
| Map<String, FeedUploadableActionMutation.FeedUploadableActionChanges> actions) { |
| ElapsedTimeTracker tracker = timingUtils.getElapsedTimeTracker(TAG); |
| |
| Builder contentMutationBuilder = new Builder(); |
| for (Map.Entry<String, FeedUploadableActionMutation.FeedUploadableActionChanges> entry : |
| actions.entrySet()) { |
| String contentId = entry.getKey(); |
| for (StreamUploadableAction removeAction : entry.getValue().removeActions()) { |
| contentMutationBuilder.delete( |
| UPLOADABLE_ACTION_PREFIX + contentId + removeAction.getPayload().hashCode()); |
| } |
| for (StreamUploadableAction upsertAction : entry.getValue().upsertActions()) { |
| contentMutationBuilder.upsert( |
| UPLOADABLE_ACTION_PREFIX + contentId + upsertAction.getPayload().hashCode(), |
| upsertAction.toByteArray()); |
| } |
| } |
| |
| CommitResult commitResult = contentStorageDirect.commit(contentMutationBuilder.build()); |
| tracker.stop("task", "commitUploadableActionMutation", "actions", actions.size()); |
| return commitResult; |
| } |
| |
| @Override |
| public void dump(Dumper dumper) { |
| dumper.title(TAG); |
| if (contentStorageDirect instanceof Dumpable) { |
| dumper.dump((Dumpable) contentStorageDirect); |
| } else { |
| dumper.forKey("contentStorageDirect").value("not dumpable"); |
| } |
| if (journalStorageDirect instanceof Dumpable) { |
| dumper.dump((Dumpable) journalStorageDirect); |
| } else { |
| dumper.forKey("journalStorage").value("not dumpable"); |
| } |
| dumper |
| .forKey("contentIdStringInternerSize") |
| .value(contentIdStringInterner.size()) |
| .compactPrevious(); |
| dumper |
| .forKey("contentIdStringInternerStats") |
| .value(contentIdStringInterner.getStats()) |
| .compactPrevious(); |
| } |
| |
| /** |
| * Clears contents and journals relating to the stream. Leaves actions and semantic properties |
| * intact. |
| * |
| * @return {@code true} when the clear succeeded |
| */ |
| public boolean clearNonActionContent() { |
| return clearContentStorage() && clearJournalStorage(); |
| } |
| |
| /** |
| * Cleans out journal storage (excluding dismiss actions journal) |
| * |
| * @return whether the clear succeeded |
| */ |
| private boolean clearJournalStorage() { |
| Result<List<String>> allJournalsResult = journalStorageDirect.getAllJournals(); |
| if (!allJournalsResult.isSuccessful()) { |
| Logger.e(TAG, "Error clearing all contents. Could not fetch all journals."); |
| return false; |
| } |
| |
| for (String journal : allJournalsResult.getValue()) { |
| if (DISMISS_ACTION_JOURNAL.equals(journal)) { |
| continue; |
| } |
| CommitResult result = |
| journalStorageDirect.commit(new JournalMutation.Builder(journal).delete().build()); |
| if (result != CommitResult.SUCCESS) { |
| Logger.e(TAG, "Error clearing all contents. Could not delete journal %s", journal); |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Cleans out content storage (excluding semantic properties) |
| * |
| * @return whether the clear succeeded |
| */ |
| private boolean clearContentStorage() { |
| Result<List<String>> results = contentStorageDirect.getAllKeys(); |
| if (!results.isSuccessful()) { |
| Logger.e(TAG, "Error clearing all contents. Could not fetch all content."); |
| return false; |
| } |
| Builder contentMutationBuilder = new Builder(); |
| for (String key : results.getValue()) { |
| if (key.contains(SEMANTIC_PROPERTIES_PREFIX)) { |
| continue; |
| } |
| contentMutationBuilder.delete(key); |
| } |
| CommitResult result = contentStorageDirect.commit(contentMutationBuilder.build()); |
| if (result != CommitResult.SUCCESS) { |
| Logger.e(TAG, "Error clearing all contents. Could not commit content deletions."); |
| return false; |
| } |
| return true; |
| } |
| |
| /** Wipes all content and journals */ |
| @Override |
| public boolean clearAll() { |
| threadUtils.checkNotMainThread(); |
| |
| boolean success = true; |
| // Run clear on both content and journal |
| CommitResult result = contentStorageDirect.commit(new Builder().deleteAll().build()); |
| CommitResult journalResult = journalStorageDirect.deleteAll(); |
| |
| // Confirm results were successful |
| if (result != CommitResult.SUCCESS) { |
| Logger.e(TAG, "Error clearing all. Could not delete all content from content storage."); |
| success = false; |
| } |
| |
| if (journalResult != CommitResult.SUCCESS) { |
| Logger.e(TAG, "Error clearing all. Could not delete all journals from journal storage."); |
| success = false; |
| } |
| |
| // Try to recreate $HEAD (it needs to exist in order to create new sessions) |
| // #clearHead() also will create it if it does not exist |
| clearHead(); |
| |
| return success; |
| } |
| } |