blob: 131de39df883e1a4f1b15539876d62c12c90d18d [file] [log] [blame]
/*
* Copyright 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.ipc.invalidation.ticl;
import com.google.ipc.invalidation.external.client.SystemResources;
import com.google.ipc.invalidation.external.client.SystemResources.Logger;
import com.google.ipc.invalidation.external.client.SystemResources.NetworkChannel;
import com.google.ipc.invalidation.external.client.SystemResources.Scheduler;
import com.google.ipc.invalidation.external.client.types.SimplePair;
import com.google.ipc.invalidation.ticl.InvalidationClientCore.BatchingTask;
import com.google.ipc.invalidation.ticl.Statistics.ClientErrorType;
import com.google.ipc.invalidation.ticl.Statistics.ReceivedMessageType;
import com.google.ipc.invalidation.ticl.Statistics.SentMessageType;
import com.google.ipc.invalidation.ticl.proto.ClientConstants;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ApplicationClientIdP;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientConfigP;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientHeader;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientToServerMessage;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientVersion;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ConfigChangeMessage;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ErrorMessage;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InfoMessage;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InfoRequestMessage;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InitializeMessage;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InitializeMessage.DigestSerializationType;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InvalidationMessage;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InvalidationP;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ObjectIdP;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.PropertyRecord;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ProtocolHandlerConfigP;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RateLimitP;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationMessage;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationP;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationP.OpType;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationStatusMessage;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSubtree;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSummary;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSyncMessage;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSyncRequestMessage;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ServerHeader;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ServerToClientMessage;
import com.google.ipc.invalidation.ticl.proto.ClientProtocol.TokenControlMessage;
import com.google.ipc.invalidation.ticl.proto.CommonProtos;
import com.google.ipc.invalidation.ticl.proto.JavaClient.BatcherState;
import com.google.ipc.invalidation.ticl.proto.JavaClient.ProtocolHandlerState;
import com.google.ipc.invalidation.util.Bytes;
import com.google.ipc.invalidation.util.InternalBase;
import com.google.ipc.invalidation.util.Marshallable;
import com.google.ipc.invalidation.util.Preconditions;
import com.google.ipc.invalidation.util.ProtoWrapper;
import com.google.ipc.invalidation.util.ProtoWrapper.ValidationException;
import com.google.ipc.invalidation.util.Smearer;
import com.google.ipc.invalidation.util.TextBuilder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* A layer for interacting with low-level protocol messages. Parses messages from the server and
* calls appropriate functions on the {@code ProtocolListener} to handle various types of message
* content. Also buffers message data from the client and constructs and sends messages to the
* server.
* <p>
* This class implements {@link Marshallable}, so its state can be written to a protocol buffer,
* and instances can be restored from such protocol buffers. Additionally, the nested class
* {@link Batcher} also implements {@code Marshallable} for the same reason.
* <p>
* Note that while we talk about "marshalling," in this context we mean marshalling to protocol
* buffers, not raw bytes.
*
*/
class ProtocolHandler implements Marshallable<ProtocolHandlerState> {
/** Class that batches messages to the server. */
private static class Batcher implements Marshallable<BatcherState> {
/** Statistics to be updated when messages are created. */
private final Statistics statistics;
/** Resources used for logging and thread assertions. */
private final SystemResources resources;
/** Set of pending registrations stored as a map for overriding later operations. */
private final Map<ObjectIdP, Integer> pendingRegistrations = new HashMap<ObjectIdP, Integer>();
/** Set of pending invalidation acks. */
private final Set<InvalidationP> pendingAckedInvalidations = new HashSet<InvalidationP>();
/** Set of pending registration sub trees for registration sync. */
private final Set<RegistrationSubtree> pendingRegSubtrees = new HashSet<RegistrationSubtree>();
/** Pending initialization message to send to the server, if any. */
private InitializeMessage pendingInitializeMessage = null;
/** Pending info message to send to the server, if any. */
private InfoMessage pendingInfoMessage = null;
/** Creates a batcher. */
Batcher(SystemResources resources, Statistics statistics) {
this.resources = resources;
this.statistics = statistics;
}
/** Creates a batcher from {@code marshalledState}. */
Batcher(SystemResources resources, Statistics statistics, BatcherState marshalledState) {
this(resources, statistics);
for (ObjectIdP registration : marshalledState.getRegistration()) {
pendingRegistrations.put(registration, RegistrationP.OpType.REGISTER);
}
for (ObjectIdP unregistration : marshalledState.getUnregistration()) {
pendingRegistrations.put(unregistration, RegistrationP.OpType.UNREGISTER);
}
for (InvalidationP ack : marshalledState.getAcknowledgement()) {
pendingAckedInvalidations.add(ack);
}
for (RegistrationSubtree subtree : marshalledState.getRegistrationSubtree()) {
pendingRegSubtrees.add(subtree);
}
pendingInitializeMessage = marshalledState.getNullableInitializeMessage();
if (marshalledState.hasInfoMessage()) {
pendingInfoMessage = marshalledState.getInfoMessage();
}
}
/** Sets the initialize message to be sent. */
void setInitializeMessage(InitializeMessage msg) {
pendingInitializeMessage = msg;
}
/** Sets the info message to be sent. */
void setInfoMessage(InfoMessage msg) {
pendingInfoMessage = msg;
}
/** Adds a registration on {@code oid} of {@code opType} to the registrations to be sent. */
void addRegistration(ObjectIdP oid, Integer opType) {
pendingRegistrations.put(oid, opType);
}
/** Adds {@code ack} to the set of acknowledgements to be sent. */
void addAck(InvalidationP ack) {
pendingAckedInvalidations.add(ack);
}
/** Adds {@code subtree} to the set of registration subtrees to be sent. */
void addRegSubtree(RegistrationSubtree subtree) {
pendingRegSubtrees.add(subtree);
}
/**
* Returns a builder for a {@link ClientToServerMessage} to be sent to the server. Crucially,
* the builder does <b>NOT</b> include the message header.
* @param hasClientToken whether the client currently holds a token
*/
ClientToServerMessage toMessage(final ClientHeader header, boolean hasClientToken) {
final InitializeMessage initializeMessage;
final RegistrationMessage registrationMessage;
final RegistrationSyncMessage registrationSyncMessage;
final InvalidationMessage invalidationAckMessage;
final InfoMessage infoMessage;
if (pendingInitializeMessage != null) {
statistics.recordSentMessage(SentMessageType.INITIALIZE);
initializeMessage = pendingInitializeMessage;
pendingInitializeMessage = null;
} else {
initializeMessage = null;
}
// Note: Even if an initialize message is being sent, we can send additional
// messages such as regisration messages, etc to the server. But if there is no token
// and an initialize message is not being sent, we cannot send any other message.
if (!hasClientToken && (initializeMessage == null)) {
// Cannot send any message
resources.getLogger().warning(
"Cannot send message since no token and no initialize msg");
statistics.recordError(ClientErrorType.TOKEN_MISSING_FAILURE);
return null;
}
// Check for pending batched operations and add to message builder if needed.
// Add reg, acks, reg subtrees - clear them after adding.
if (!pendingAckedInvalidations.isEmpty()) {
invalidationAckMessage = createInvalidationAckMessage();
statistics.recordSentMessage(SentMessageType.INVALIDATION_ACK);
} else {
invalidationAckMessage = null;
}
// Check regs.
if (!pendingRegistrations.isEmpty()) {
registrationMessage = createRegistrationMessage();
statistics.recordSentMessage(SentMessageType.REGISTRATION);
} else {
registrationMessage = null;
}
// Check reg substrees.
if (!pendingRegSubtrees.isEmpty()) {
// If there are multiple pending reg subtrees, only one is sent.
ArrayList<RegistrationSubtree> regSubtrees = new ArrayList<RegistrationSubtree>(1);
regSubtrees.add(pendingRegSubtrees.iterator().next());
registrationSyncMessage = RegistrationSyncMessage.create(regSubtrees);
pendingRegSubtrees.clear();
statistics.recordSentMessage(SentMessageType.REGISTRATION_SYNC);
} else {
registrationSyncMessage = null;
}
// Check if an info message has to be sent.
if (pendingInfoMessage != null) {
statistics.recordSentMessage(SentMessageType.INFO);
infoMessage = pendingInfoMessage;
pendingInfoMessage = null;
} else {
infoMessage = null;
}
return ClientToServerMessage.create(header, initializeMessage, registrationMessage,
registrationSyncMessage, invalidationAckMessage, infoMessage);
}
/**
* Creates a registration message based on registrations from {@code pendingRegistrations}
* and returns it.
* <p>
* REQUIRES: pendingRegistrations.size() > 0
*/
private RegistrationMessage createRegistrationMessage() {
Preconditions.checkState(!pendingRegistrations.isEmpty());
// Run through the pendingRegistrations map.
List<RegistrationP> pendingRegistrations =
new ArrayList<RegistrationP>(this.pendingRegistrations.size());
for (Map.Entry<ObjectIdP, Integer> entry : this.pendingRegistrations.entrySet()) {
pendingRegistrations.add(RegistrationP.create(entry.getKey(), entry.getValue()));
}
this.pendingRegistrations.clear();
return RegistrationMessage.create(pendingRegistrations);
}
/**
* Creates an invalidation ack message based on acks from {@code pendingAckedInvalidations} and
* returns it.
* <p>
* REQUIRES: pendingAckedInvalidations.size() > 0
*/
private InvalidationMessage createInvalidationAckMessage() {
Preconditions.checkState(!pendingAckedInvalidations.isEmpty());
InvalidationMessage ackMessage =
InvalidationMessage.create(new ArrayList<InvalidationP>(pendingAckedInvalidations));
pendingAckedInvalidations.clear();
return ackMessage;
}
@Override
public BatcherState marshal() {
// Marshall (un)registrations.
ArrayList<ObjectIdP> registrations = new ArrayList<ObjectIdP>(pendingRegistrations.size());
ArrayList<ObjectIdP> unregistrations = new ArrayList<ObjectIdP>(pendingRegistrations.size());
for (Map.Entry<ObjectIdP, Integer> entry : pendingRegistrations.entrySet()) {
Integer opType = entry.getValue();
ObjectIdP oid = entry.getKey();
new ArrayList<ObjectIdP>(pendingRegistrations.size());
switch (opType) {
case OpType.REGISTER:
registrations.add(oid);
break;
case OpType.UNREGISTER:
unregistrations.add(oid);
break;
default:
throw new IllegalArgumentException(opType.toString());
}
}
return BatcherState.create(registrations, unregistrations, pendingAckedInvalidations,
pendingRegSubtrees, pendingInitializeMessage, pendingInfoMessage);
}
}
/** Representation of a message header for use in a server message. */
static class ServerMessageHeader extends InternalBase {
/**
* Constructs an instance.
*
* @param token server-sent token
* @param registrationSummary summary over server registration state
*/
ServerMessageHeader(Bytes token, RegistrationSummary registrationSummary) {
this.token = token;
this.registrationSummary = registrationSummary;
}
/** Server-sent token. */
Bytes token;
/** Summary of the client's registration state at the server. */
RegistrationSummary registrationSummary;
@Override
public void toCompactString(TextBuilder builder) {
builder.appendFormat("Token: %s, Summary: %s", token, registrationSummary);
}
}
/**
* Representation of a message receiver for the server. Such a message is guaranteed to be
* valid, but the session token is <b>not</b> checked.
*/
static class ParsedMessage {
/*
* Each of these fields corresponds directly to a field in the ServerToClientMessage protobuf.
* It is non-null iff the corresponding hasYYY method in the protobuf would return true.
*/
final ServerMessageHeader header;
final TokenControlMessage tokenControlMessage;
final InvalidationMessage invalidationMessage;
final RegistrationStatusMessage registrationStatusMessage;
final RegistrationSyncRequestMessage registrationSyncRequestMessage;
final ConfigChangeMessage configChangeMessage;
final InfoRequestMessage infoRequestMessage;
final ErrorMessage errorMessage;
/** Constructs an instance from a {@code rawMessage}. */
ParsedMessage(ServerToClientMessage rawMessage) {
// For each field, assign it to the corresponding protobuf field if present, else null.
ServerHeader messageHeader = rawMessage.getHeader();
header = new ServerMessageHeader(messageHeader.getClientToken(),
messageHeader.getNullableRegistrationSummary());
tokenControlMessage =
rawMessage.hasTokenControlMessage() ? rawMessage.getTokenControlMessage() : null;
invalidationMessage = rawMessage.getNullableInvalidationMessage();
registrationStatusMessage = rawMessage.getNullableRegistrationStatusMessage();
registrationSyncRequestMessage = rawMessage.hasRegistrationSyncRequestMessage()
? rawMessage.getRegistrationSyncRequestMessage() : null;
configChangeMessage =
rawMessage.hasConfigChangeMessage() ? rawMessage.getConfigChangeMessage() : null;
infoRequestMessage = rawMessage.getNullableInfoRequestMessage();
errorMessage = rawMessage.getNullableErrorMessage();
}
}
/**
* Listener for protocol events. The handler guarantees that the call will be made on the internal
* thread that the SystemResources provides.
*/
interface ProtocolListener {
/** Records that a message was sent to the server at the current time. */
void handleMessageSent();
/** Returns a summary of the current desired registrations. */
RegistrationSummary getRegistrationSummary();
/** Returns the current server-assigned client token, if any. */
Bytes getClientToken();
}
/** Information about the client, e.g., application name, OS, etc. */
private final ClientVersion clientVersion;
/** A logger. */
private final Logger logger;
/** Scheduler for the client's internal processing. */
private final Scheduler internalScheduler;
/** Network channel for sending and receiving messages to and from the server. */
private final NetworkChannel network;
/** The protocol listener. */
private final ProtocolListener listener;
/** Batches messages to the server. */
private final Batcher batcher;
/** A debug message id that is added to every message to the server. */
private int messageId = 1;
// State specific to a client. If we want to support multiple clients, this could
// be in a map or could be eliminated (e.g., no batching).
/** The last known time from the server. */
private long lastKnownServerTimeMs = 0;
/**
* The next time before which a message cannot be sent to the server. If this is less than current
* time, a message can be sent at any time.
*/
private long nextMessageSendTimeMs = 0;
/** Statistics objects to track number of sent messages, etc. */
private final Statistics statistics;
/** Client type for inclusion in headers. */
private final int clientType;
/**
* Creates an instance.
*
* @param config configuration for the client
* @param resources resources to use
* @param smearer a smearer to randomize delays
* @param statistics track information about messages sent/received, etc
* @param applicationName name of the application using the library (for debugging/monitoring)
* @param listener callback for protocol events
*/
ProtocolHandler(ProtocolHandlerConfigP config, final SystemResources resources,
Smearer smearer, Statistics statistics, int clientType, String applicationName,
ProtocolListener listener, ProtocolHandlerState marshalledState) {
this.logger = resources.getLogger();
this.statistics = statistics;
this.internalScheduler = resources.getInternalScheduler();
this.network = resources.getNetwork();
this.listener = listener;
this.clientVersion = CommonProtos.newClientVersion(resources.getPlatform(), "Java",
applicationName);
this.clientType = clientType;
if (marshalledState == null) {
// If there is no marshalled state, construct a clean batcher.
this.batcher = new Batcher(resources, statistics);
} else {
// Otherwise, restore the batcher from the marshalled state.
this.batcher = new Batcher(resources, statistics, marshalledState.getBatcherState());
this.messageId = marshalledState.getMessageId();
this.lastKnownServerTimeMs = marshalledState.getLastKnownServerTimeMs();
this.nextMessageSendTimeMs = marshalledState.getNextMessageSendTimeMs();
}
logger.info("Created protocol handler for application %s, platform %s", applicationName,
resources.getPlatform());
}
/** Returns a default config for the protocol handler. */
static ProtocolHandlerConfigP createConfig() {
// Allow at most 3 messages every 5 seconds.
int windowMs = 5 * 1000;
int numMessagesPerWindow = 3;
List<RateLimitP> rateLimits = new ArrayList<RateLimitP>();
rateLimits.add(RateLimitP.create(windowMs, numMessagesPerWindow));
return ProtocolHandlerConfigP.create(null, rateLimits);
}
/** Returns a configuration object with parameters set for unit tests. */
static ProtocolHandlerConfigP createConfigForTest() {
// No rate limits
int smallBatchDelayForTest = 200;
return ProtocolHandlerConfigP.create(smallBatchDelayForTest, new ArrayList<RateLimitP>(0));
}
/**
* Returns the next time a message is allowed to be sent to the server. Typically, this will be
* in the past, meaning that the client is free to send a message at any time.
*/
public long getNextMessageSendTimeMsForTest() {
return nextMessageSendTimeMs;
}
/**
* Handles a message from the server. If the message can be processed (i.e., is valid, is
* of the right version, and is not a silence message), returns a {@link ParsedMessage}
* representing it. Otherwise, returns {@code null}.
* <p>
* This class intercepts and processes silence messages. In this case, it will discard any other
* data in the message.
* <p>
* Note that this method does <b>not</b> check the session token of any message.
*/
ParsedMessage handleIncomingMessage(byte[] incomingMessage) {
Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
ServerToClientMessage message;
try {
message = ServerToClientMessage.parseFrom(incomingMessage);
} catch (ValidationException exception) {
statistics.recordError(ClientErrorType.INCOMING_MESSAGE_FAILURE);
logger.warning("Incoming message is invalid: %s", Bytes.toLazyCompactString(incomingMessage));
return null;
}
// Check the version of the message.
if (message.getHeader().getProtocolVersion().getVersion().getMajorVersion() !=
ClientConstants.PROTOCOL_MAJOR_VERSION) {
statistics.recordError(ClientErrorType.PROTOCOL_VERSION_FAILURE);
logger.severe("Dropping message with incompatible version: %s", message);
return null;
}
// Check if it is a ConfigChangeMessage which indicates that messages should no longer be
// sent for a certain duration. Perform this check before the token is even checked.
if (message.hasConfigChangeMessage()) {
ConfigChangeMessage configChangeMsg = message.getConfigChangeMessage();
statistics.recordReceivedMessage(ReceivedMessageType.CONFIG_CHANGE);
if (configChangeMsg.hasNextMessageDelayMs()) { // Validator has ensured that it is positive.
nextMessageSendTimeMs =
internalScheduler.getCurrentTimeMs() + configChangeMsg.getNextMessageDelayMs();
}
return null; // Ignore all other messages in the envelope.
}
lastKnownServerTimeMs = Math.max(lastKnownServerTimeMs, message.getHeader().getServerTimeMs());
return new ParsedMessage(message);
}
/**
* Sends a message to the server to request a client token.
*
* @param applicationClientId application-specific client id
* @param nonce nonce for the request
* @param debugString information to identify the caller
*/
void sendInitializeMessage(ApplicationClientIdP applicationClientId, Bytes nonce,
BatchingTask batchingTask, String debugString) {
Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
if (applicationClientId.getClientType() != clientType) {
// This condition is not fatal, but it probably represents a bug somewhere if it occurs.
logger.warning(
"Client type in application id does not match constructor-provided type: %s vs %s",
applicationClientId, clientType);
}
// Simply store the message in pendingInitializeMessage and send it when the batching task runs.
InitializeMessage initializeMsg = InitializeMessage.create(clientType, nonce,
applicationClientId, DigestSerializationType.BYTE_BASED);
batcher.setInitializeMessage(initializeMsg);
logger.info("Batching initialize message for client: %s, %s", debugString, initializeMsg);
batchingTask.ensureScheduled(debugString);
}
/**
* Sends an info message to the server with the performance counters supplied
* in {@code performanceCounters} and the config supplies in
* {@code configParams}.
*
* @param requestServerRegistrationSummary indicates whether to request the
* server's registration summary
*/
void sendInfoMessage(List<SimplePair<String, Integer>> performanceCounters,
ClientConfigP clientConfig, boolean requestServerRegistrationSummary,
BatchingTask batchingTask) {
Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
List<PropertyRecord> performanceCounterRecords =
new ArrayList<PropertyRecord>(performanceCounters.size());
for (SimplePair<String, Integer> counter : performanceCounters) {
performanceCounterRecords.add(PropertyRecord.create(counter.first, counter.second));
}
InfoMessage infoMessage = InfoMessage.create(clientVersion, /* configParameter */ null,
performanceCounterRecords, requestServerRegistrationSummary, clientConfig);
// Simply store the message in pendingInfoMessage and send it when the batching task runs.
batcher.setInfoMessage(infoMessage);
batchingTask.ensureScheduled("Send-info");
}
/**
* Sends a registration request to the server.
*
* @param objectIds object ids on which to (un)register
* @param regOpType whether to register or unregister
*/
void sendRegistrations(Collection<ObjectIdP> objectIds, Integer regOpType,
BatchingTask batchingTask) {
Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
for (ObjectIdP objectId : objectIds) {
batcher.addRegistration(objectId, regOpType);
}
batchingTask.ensureScheduled("Send-registrations");
}
/** Sends an acknowledgement for {@code invalidation} to the server. */
void sendInvalidationAck(InvalidationP invalidation, BatchingTask batchingTask) {
Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
// We could summarize acks when there are suppressing invalidations - we don't since it is
// unlikely to be too beneficial here.
logger.fine("Sending ack for invalidation %s", invalidation);
batcher.addAck(invalidation);
batchingTask.ensureScheduled("Send-Ack");
}
/**
* Sends a single registration subtree to the server.
*
* @param regSubtree subtree to send
*/
void sendRegistrationSyncSubtree(RegistrationSubtree regSubtree, BatchingTask batchingTask) {
Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
batcher.addRegSubtree(regSubtree);
logger.info("Adding subtree: %s", regSubtree);
batchingTask.ensureScheduled("Send-reg-sync");
}
/** Sends pending data to the server (e.g., registrations, acks, registration sync messages). */
void sendMessageToServer() {
Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
if (nextMessageSendTimeMs > internalScheduler.getCurrentTimeMs()) {
logger.warning("In quiet period: not sending message to server: %s > %s",
nextMessageSendTimeMs, internalScheduler.getCurrentTimeMs());
return;
}
// Create the message from the batcher.
ClientToServerMessage message;
try {
message = batcher.toMessage(createClientHeader(), listener.getClientToken() != null);
if (message == null) {
// Happens when we don't have a token and are not sending an initialize message. Logged
// in batcher.toMessage().
return;
}
} catch (ProtoWrapper.ValidationArgumentException exception) {
logger.severe("Tried to send invalid message: %s", batcher);
statistics.recordError(ClientErrorType.OUTGOING_MESSAGE_FAILURE);
return;
}
++messageId;
statistics.recordSentMessage(SentMessageType.TOTAL);
logger.fine("Sending message to server: %s", message);
network.sendMessage(message.toByteArray());
// Record that the message was sent. We're invoking the listener directly, rather than
// scheduling a new work unit to do it. It would be safer to do a schedule, but that's hard to
// do in Android, we wrote this listener (it's InvalidationClientCore, so we know what it does),
// and it's the last line of this function.
listener.handleMessageSent();
}
/** Returns the header to include on a message to the server. */
private ClientHeader createClientHeader() {
Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
return ClientHeader.create(ClientConstants.PROTOCOL_VERSION,
listener.getClientToken(), listener.getRegistrationSummary(),
internalScheduler.getCurrentTimeMs(), lastKnownServerTimeMs, Integer.toString(messageId),
clientType);
}
@Override
public ProtocolHandlerState marshal() {
return ProtocolHandlerState.create(messageId, lastKnownServerTimeMs, nextMessageSendTimeMs,
batcher.marshal());
}
}