| // Copyright 2018 The Feed Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package com.google.android.libraries.feed.feedrequestmanager; |
| |
| import com.google.android.libraries.feed.api.common.ThreadUtils; |
| import com.google.android.libraries.feed.api.protocoladapter.ProtocolAdapter; |
| import com.google.android.libraries.feed.api.requestmanager.ActionUploadRequestManager; |
| import com.google.android.libraries.feed.common.Result; |
| import com.google.android.libraries.feed.common.concurrent.TaskQueue; |
| import com.google.android.libraries.feed.common.concurrent.TaskQueue.TaskType; |
| import com.google.android.libraries.feed.common.functional.Consumer; |
| import com.google.android.libraries.feed.common.logging.Logger; |
| import com.google.android.libraries.feed.common.protoextensions.FeedExtensionRegistry; |
| import com.google.android.libraries.feed.common.time.Clock; |
| import com.google.android.libraries.feed.host.config.Configuration; |
| import com.google.android.libraries.feed.host.config.Configuration.ConfigKey; |
| import com.google.android.libraries.feed.host.network.HttpRequest; |
| import com.google.android.libraries.feed.host.network.HttpRequest.HttpMethod; |
| import com.google.android.libraries.feed.host.network.NetworkClient; |
| import com.google.android.libraries.feed.host.storage.CommitResult; |
| import com.google.android.libraries.feed.internalapi.actionmanager.ActionReader; |
| import com.google.android.libraries.feed.internalapi.store.Store; |
| import com.google.android.libraries.feed.internalapi.store.UploadableActionMutation; |
| import com.google.search.now.feed.client.StreamDataProto.StreamUploadableAction; |
| import com.google.search.now.wire.feed.ConsistencyTokenProto.ConsistencyToken; |
| import com.google.search.now.wire.feed.FeedActionResponseProto.FeedActionResponse; |
| import com.google.search.now.wire.feed.ResponseProto.Response; |
| import java.io.IOException; |
| import java.io.UnsupportedEncodingException; |
| import java.util.HashSet; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| |
| /** Default implementation of ActionUploadRequestManager. */ |
| public final class FeedActionUploadRequestManager implements ActionUploadRequestManager { |
| private static final String TAG = "FeedActionUploadRequestManager"; |
| |
| private final Configuration configuration; |
| private final NetworkClient networkClient; |
| private final ProtocolAdapter protocolAdapter; |
| private final FeedExtensionRegistry extensionRegistry; |
| private final TaskQueue taskQueue; |
| private final ThreadUtils threadUtils; |
| private final ActionReader actionReader; |
| private final Store store; |
| private final Clock clock; |
| private final int maxActionUploadAttempts; |
| // Total number of actions that can be uploaded in a chained batch request. |
| private final int maxActionsUploadsPerBatchedRequest; |
| // Maximum bytes that can be uploaded in a single request. |
| private final int maxBytesPerRequest; |
| private final long maxActionUploadTtl; |
| |
| public FeedActionUploadRequestManager( |
| Configuration configuration, |
| NetworkClient networkClient, |
| ProtocolAdapter protocolAdapter, |
| FeedExtensionRegistry extensionRegistry, |
| TaskQueue taskQueue, |
| ThreadUtils threadUtils, |
| ActionReader actionReader, |
| Store store, |
| Clock clock) { |
| this.configuration = configuration; |
| this.networkClient = networkClient; |
| this.protocolAdapter = protocolAdapter; |
| this.extensionRegistry = extensionRegistry; |
| this.taskQueue = taskQueue; |
| this.threadUtils = threadUtils; |
| this.actionReader = actionReader; |
| this.store = store; |
| this.clock = clock; |
| maxBytesPerRequest = |
| configuration.getValueOrDefault(ConfigKey.FEED_ACTION_SERVER_MAX_SIZE_PER_REQUEST, 4000); |
| maxActionsUploadsPerBatchedRequest = |
| configuration.getValueOrDefault(ConfigKey.FEED_ACTION_SERVER_MAX_ACTIONS_PER_REQUEST, 10); |
| maxActionUploadAttempts = |
| configuration.getValueOrDefault(ConfigKey.FEED_ACTION_MAX_UPLOAD_ATTEMPTS, 1); |
| maxActionUploadTtl = configuration.getValueOrDefault(ConfigKey.FEED_ACTION_TTL_SECONDS, 0L); |
| } |
| |
| @Override |
| public void triggerUploadActions( |
| Set<StreamUploadableAction> actions, |
| ConsistencyToken token, |
| Consumer<Result<ConsistencyToken>> consumer) { |
| if (maxActionUploadAttempts == 0 |
| || maxBytesPerRequest == 0 |
| || maxActionsUploadsPerBatchedRequest == 0) { |
| consumer.accept(Result.success(token)); |
| return; |
| } |
| triggerUploadActions(actions, token, consumer, /* uploadCount= */ 0); |
| } |
| |
| private void triggerUploadActions( |
| Set<StreamUploadableAction> actions, |
| ConsistencyToken token, |
| Consumer<Result<ConsistencyToken>> consumer, |
| int uploadCount) { |
| threadUtils.checkNotMainThread(); |
| // Return the token if there are no actions to upload. |
| if (actions.isEmpty() || uploadCount >= maxActionsUploadsPerBatchedRequest) { |
| consumer.accept(Result.success(token)); |
| return; |
| } |
| UploadableActionsRequestBuilder requestBuilder = |
| new UploadableActionsRequestBuilder(actionReader, protocolAdapter); |
| int actionPayloadBytes = 0; |
| Set<StreamUploadableAction> actionsToUpload = new HashSet<>(); |
| for (StreamUploadableAction action : actions) { |
| int actionBytes = action.toByteArray().length; |
| if (maxBytesPerRequest > actionPayloadBytes + actionBytes) { |
| actionsToUpload.add(action); |
| actionPayloadBytes += actionBytes; |
| } else { |
| break; |
| } |
| } |
| Consumer<Result<ConsistencyToken>> tokenConsumer = |
| result -> { |
| threadUtils.checkNotMainThread(); |
| if (result.isSuccessful()) { |
| actions.removeAll(actionsToUpload); |
| if (!actions.isEmpty()) { |
| triggerUploadActions( |
| actions, result.getValue(), consumer, uploadCount + actionsToUpload.size()); |
| } else { |
| consumer.accept(Result.success(result.getValue())); |
| } |
| } else { |
| consumer.accept(uploadCount == 0 ? Result.failure() : Result.success(token)); |
| } |
| }; |
| requestBuilder.setConsistencyToken(token).setActions(actionsToUpload); |
| executeUploadActionRequest(actionsToUpload, requestBuilder, tokenConsumer); |
| } |
| |
| private void executeUploadActionRequest( |
| Set<StreamUploadableAction> actions, |
| UploadableActionsRequestBuilder requestBuilder, |
| Consumer<Result<ConsistencyToken>> consumer) { |
| threadUtils.checkNotMainThread(); |
| |
| String endpoint = configuration.getValueOrDefault(ConfigKey.FEED_ACTION_SERVER_ENDPOINT, ""); |
| @HttpMethod |
| String httpMethod = |
| configuration.getValueOrDefault(ConfigKey.FEED_ACTION_SERVER_METHOD, HttpMethod.POST); |
| HttpRequest httpRequest = |
| RequestHelper.buildHttpRequest( |
| httpMethod, requestBuilder.build().toByteArray(), endpoint, /* locale= */ ""); |
| |
| Logger.i(TAG, "Making Request: %s", httpRequest.getUri().getPath()); |
| networkClient.send( |
| httpRequest, |
| input -> { |
| Logger.i( |
| TAG, |
| "Request: %s completed with response code: %s", |
| httpRequest.getUri().getPath(), |
| input.getResponseCode()); |
| if (input.getResponseCode() != 200) { |
| String errorBody = null; |
| try { |
| errorBody = new String(input.getResponseBody(), "UTF-8"); |
| } catch (UnsupportedEncodingException e) { |
| Logger.e(TAG, "Error handling http error logging", e); |
| } |
| Logger.e(TAG, "errorCode: %d", input.getResponseCode()); |
| Logger.e(TAG, "errorResponse: %s", errorBody); |
| taskQueue.execute( |
| "executeUploadActionRequest", |
| TaskType.IMMEDIATE, |
| () -> { |
| consumer.accept(Result.failure()); |
| }); |
| return; |
| } |
| handleUploadableActionResponseBytes(actions, input.getResponseBody(), consumer); |
| }); |
| } |
| |
| private void handleUploadableActionResponseBytes( |
| Set<StreamUploadableAction> actions, |
| final byte[] responseBytes, |
| final Consumer<Result<ConsistencyToken>> consumer) { |
| taskQueue.execute( |
| "handleUploadableActionResponseBytes", |
| TaskType.IMMEDIATE, |
| () -> { |
| Response response; |
| boolean isLengthPrefixed = |
| configuration.getValueOrDefault( |
| ConfigKey.FEED_ACTION_SERVER_RESPONSE_LENGTH_PREFIXED, true); |
| try { |
| response = |
| Response.parseFrom( |
| isLengthPrefixed |
| ? RequestHelper.getLengthPrefixedValue(responseBytes) |
| : responseBytes, |
| extensionRegistry.getExtensionRegistry()); |
| } catch (IOException e) { |
| Logger.e(TAG, e, "Response parse failed"); |
| handleUpdatingActionsOnFailure(actions); |
| consumer.accept(Result.failure()); |
| return; |
| } |
| FeedActionResponse feedActionResponse = |
| response.getExtension(FeedActionResponse.feedActionResponse); |
| final Result<ConsistencyToken> contextResult; |
| if (feedActionResponse.hasConsistencyToken()) { |
| contextResult = Result.success(feedActionResponse.getConsistencyToken()); |
| UploadableActionMutation actionMutation = store.editUploadableActions(); |
| for (StreamUploadableAction action : actions) { |
| actionMutation.remove(action, action.getFeatureContentId()); |
| } |
| CommitResult commitResult = actionMutation.commit(); |
| if (commitResult != CommitResult.SUCCESS) { |
| // TODO:log failure to the basicLoggingApi |
| Logger.e(TAG, "Removing actions on success failed"); |
| } |
| } else { |
| contextResult = Result.failure(); |
| handleUpdatingActionsOnFailure(actions); |
| } |
| consumer.accept(contextResult); |
| }); |
| } |
| |
| private void handleUpdatingActionsOnFailure(Set<StreamUploadableAction> actions) { |
| UploadableActionMutation actionMutation = store.editUploadableActions(); |
| for (StreamUploadableAction action : actions) { |
| int uploadAttempts = action.getUploadAttempts(); |
| long currentTime = TimeUnit.MILLISECONDS.toSeconds(clock.currentTimeMillis()); |
| long timeSinceUpload = currentTime - action.getTimestampSeconds(); |
| if (uploadAttempts < maxActionUploadAttempts && timeSinceUpload < maxActionUploadTtl) { |
| actionMutation.upsert( |
| action.toBuilder().setUploadAttempts(uploadAttempts + 1).build(), |
| action.getFeatureContentId()); |
| } else { |
| actionMutation.remove(action, action.getFeatureContentId()); |
| } |
| CommitResult commitResult = actionMutation.commit(); |
| if (commitResult != CommitResult.SUCCESS) { |
| // TODO:log failure to the basicLoggingApi |
| Logger.e(TAG, "Upserting actions on failure failed"); |
| } |
| } |
| } |
| } |