blob: 2f233b50973e41804b53a57bc2d0cb677c6bb7a7 [file] [log] [blame]
// Copyright 2012 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Client for interacting with low-level protocol messages.
#include "google/cacheinvalidation/impl/protocol-handler.h"
#include "google/cacheinvalidation/deps/string_util.h"
#include "google/cacheinvalidation/impl/constants.h"
#include "google/cacheinvalidation/impl/invalidation-client-core.h"
#include "google/cacheinvalidation/impl/log-macro.h"
#include "google/cacheinvalidation/impl/proto-helpers.h"
#include "google/cacheinvalidation/impl/recurring-task.h"
namespace invalidation {
using ::ipc::invalidation::ConfigChangeMessage;
using ::ipc::invalidation::InfoMessage;
using ::ipc::invalidation::InitializeMessage;
using ::ipc::invalidation::InitializeMessage_DigestSerializationType_BYTE_BASED;
using ::ipc::invalidation::InvalidationMessage;
using ::ipc::invalidation::PropertyRecord;
using ::ipc::invalidation::RegistrationMessage;
using ::ipc::invalidation::RegistrationSyncMessage;
using ::ipc::invalidation::ServerHeader;
using ::ipc::invalidation::ServerToClientMessage;
using ::ipc::invalidation::TokenControlMessage;
string ServerMessageHeader::ToString() const {
return StringPrintf(
"Token: %s, Summary: %s", ProtoHelpers::ToString(*token_).c_str(),
ProtoHelpers::ToString(*registration_summary_).c_str());
}
void ParsedMessage::InitFrom(const ServerToClientMessage& raw_message) {
base_message = raw_message; // Does a deep copy.
// For each field, assign it to the corresponding protobuf field if
// present, else NULL.
header.InitFrom(&base_message.header().client_token(),
base_message.header().has_registration_summary() ?
&base_message.header().registration_summary() : NULL);
token_control_message = base_message.has_token_control_message() ?
&base_message.token_control_message() : NULL;
invalidation_message = base_message.has_invalidation_message() ?
&base_message.invalidation_message() : NULL;
registration_status_message =
base_message.has_registration_status_message() ?
&base_message.registration_status_message() : NULL;
registration_sync_request_message =
base_message.has_registration_sync_request_message() ?
&base_message.registration_sync_request_message() : NULL;
config_change_message = base_message.has_config_change_message() ?
&base_message.config_change_message() : NULL;
info_request_message = base_message.has_info_request_message() ?
&base_message.info_request_message() : NULL;
error_message = base_message.has_error_message() ?
&base_message.error_message() : NULL;
}
ProtocolHandler::ProtocolHandler(
const ProtocolHandlerConfigP& config, SystemResources* resources,
Smearer* smearer, Statistics* statistics, int client_type,
const string& application_name, ProtocolListener* listener,
TiclMessageValidator* msg_validator)
: logger_(resources->logger()),
internal_scheduler_(resources->internal_scheduler()),
network_(resources->network()),
throttle_(config.rate_limit(), internal_scheduler_,
NewPermanentCallback(this, &ProtocolHandler::SendMessageToServer)),
listener_(listener),
msg_validator_(msg_validator),
message_id_(1),
last_known_server_time_ms_(0),
next_message_send_time_ms_(0),
statistics_(statistics),
batcher_(resources->logger(), statistics),
client_type_(client_type) {
// Initialize client version.
ProtoHelpers::InitClientVersion(resources->platform(), application_name,
&client_version_);
}
void ProtocolHandler::InitConfig(ProtocolHandlerConfigP* config) {
// Add rate limits.
// Allow at most 3 messages every 5 seconds.
int window_ms = 5 * 1000;
int num_messages_per_window = 3;
ProtoHelpers::InitRateLimitP(window_ms, num_messages_per_window,
config->add_rate_limit());
}
void ProtocolHandler::InitConfigForTest(ProtocolHandlerConfigP* config) {
// No rate limits.
int small_batch_delay_for_test = 200;
config->set_batching_delay_ms(small_batch_delay_for_test);
// At most one message per second.
ProtoHelpers::InitRateLimitP(1000, 1, config->add_rate_limit());
// At most six messages per minute.
ProtoHelpers::InitRateLimitP(60 * 1000, 6, config->add_rate_limit());
}
bool ProtocolHandler::HandleIncomingMessage(const string& incoming_message,
ParsedMessage* parsed_message) {
ServerToClientMessage message;
message.ParseFromString(incoming_message);
if (!message.IsInitialized()) {
TLOG(logger_, WARNING, "Incoming message is unparseable: %s",
ProtoHelpers::ToString(incoming_message).c_str());
return false;
}
// Validate the message. If this passes, we can blindly assume valid messages
// from here on.
TLOG(logger_, FINE, "Incoming message: %s",
ProtoHelpers::ToString(message).c_str());
if (!msg_validator_->IsValid(message)) {
statistics_->RecordError(
Statistics::ClientErrorType_INCOMING_MESSAGE_FAILURE);
TLOG(logger_, SEVERE, "Received invalid message: %s",
ProtoHelpers::ToString(message).c_str());
return false;
}
// Check the version of the message.
const ServerHeader& message_header = message.header();
if (message_header.protocol_version().version().major_version() !=
Constants::kProtocolMajorVersion) {
statistics_->RecordError(
Statistics::ClientErrorType_PROTOCOL_VERSION_FAILURE);
TLOG(logger_, SEVERE, "Dropping message with incompatible version: %s",
ProtoHelpers::ToString(message).c_str());
return false;
}
// Check if it is a ConfigChangeMessage which indicates that messages should
// no longer be sent for a certain duration. Perform this check before the
// token is even checked.
if (message.has_config_change_message()) {
const ConfigChangeMessage& config_change_msg =
message.config_change_message();
statistics_->RecordReceivedMessage(
Statistics::ReceivedMessageType_CONFIG_CHANGE);
if (config_change_msg.has_next_message_delay_ms()) {
// Validator has ensured that it is positive.
next_message_send_time_ms_ = GetCurrentTimeMs() +
config_change_msg.next_message_delay_ms();
}
return false; // Ignore all other messages in the envelope.
}
if (message_header.server_time_ms() > last_known_server_time_ms_) {
last_known_server_time_ms_ = message_header.server_time_ms();
}
parsed_message->InitFrom(message);
return true;
}
bool ProtocolHandler::CheckServerToken(const string& server_token) {
CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
const string& client_token = listener_->GetClientToken();
// If we do not have a client token yet, there is nothing to compare. The
// message must have an initialize message and the upper layer will do the
// appropriate checks. Hence, we return true if client_token is empty.
if (client_token.empty()) {
// No token. Return true so that we'll attempt to deliver a token control
// message (if any) to the listener in handleIncomingMessage.
return true;
}
if (client_token != server_token) {
// Bad token - reject whole message. However, our channel can send us
// messages intended for other clients belonging to the same user, so don't
// log too loudly.
TLOG(logger_, INFO, "Incoming message has bad token: %s, %s",
ProtoHelpers::ToString(client_token).c_str(),
ProtoHelpers::ToString(server_token).c_str());
statistics_->RecordError(Statistics::ClientErrorType_TOKEN_MISMATCH);
return false;
}
return true;
}
void ProtocolHandler::SendInitializeMessage(
const ApplicationClientIdP& application_client_id,
const string& nonce,
BatchingTask* batching_task,
const string& debug_string) {
CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
if (application_client_id.client_type() != client_type_) {
// This condition is not fatal, but it probably represents a bug somewhere
// if it occurs.
TLOG(logger_, WARNING, "Client type in application id does not match "
"constructor-provided type: %s vs %s",
ProtoHelpers::ToString(application_client_id).c_str(), client_type_);
}
// Simply store the message in pending_initialize_message_ and send it
// when the batching task runs.
InitializeMessage* message = new InitializeMessage();
ProtoHelpers::InitInitializeMessage(application_client_id, nonce, message);
TLOG(logger_, INFO, "Batching initialize message for client: %s, %s",
debug_string.c_str(),
ProtoHelpers::ToString(*message).c_str());
batcher_.SetInitializeMessage(message);
batching_task->EnsureScheduled(debug_string);
}
void ProtocolHandler::SendInfoMessage(
const vector<pair<string, int> >& performance_counters,
ClientConfigP* client_config,
bool request_server_registration_summary,
BatchingTask* batching_task) {
CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
// Simply store the message in pending_info_message_ and send it
// when the batching task runs.
InfoMessage* message = new InfoMessage();
message->mutable_client_version()->CopyFrom(client_version_);
// Add configuration parameters.
if (client_config != NULL) {
message->mutable_client_config()->CopyFrom(*client_config);
}
// Add performance counters.
for (size_t i = 0; i < performance_counters.size(); ++i) {
PropertyRecord* counter = message->add_performance_counter();
counter->set_name(performance_counters[i].first);
counter->set_value(performance_counters[i].second);
}
// Indicate whether we want the server's registration summary sent back.
message->set_server_registration_summary_requested(
request_server_registration_summary);
TLOG(logger_, INFO, "Batching info message for client: %s",
ProtoHelpers::ToString(*message).c_str());
batcher_.SetInfoMessage(message);
batching_task->EnsureScheduled("Send-info");
}
void ProtocolHandler::SendRegistrations(
const vector<ObjectIdP>& object_ids,
RegistrationP::OpType reg_op_type,
BatchingTask* batching_task) {
CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
for (size_t i = 0; i < object_ids.size(); ++i) {
batcher_.AddRegistration(object_ids[i], reg_op_type);
}
batching_task->EnsureScheduled("Send-registrations");
}
void ProtocolHandler::SendInvalidationAck(const InvalidationP& invalidation,
BatchingTask* batching_task) {
CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
// We could summarize acks if there are suppressing invalidations - we don't
// since it is unlikely to be too beneficial here.
batcher_.AddAck(invalidation);
batching_task->EnsureScheduled("Send-ack");
}
void ProtocolHandler::SendRegistrationSyncSubtree(
const RegistrationSubtree& reg_subtree,
BatchingTask* batching_task) {
CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
TLOG(logger_, INFO, "Adding subtree: %s",
ProtoHelpers::ToString(reg_subtree).c_str());
batcher_.AddRegSubtree(reg_subtree);
batching_task->EnsureScheduled("Send-reg-sync");
}
void ProtocolHandler::SendMessageToServer() {
CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
if (next_message_send_time_ms_ > GetCurrentTimeMs()) {
TLOG(logger_, WARNING, "In quiet period: not sending message to server: "
"%s > %s",
SimpleItoa(next_message_send_time_ms_).c_str(),
SimpleItoa(GetCurrentTimeMs()).c_str());
return;
}
const bool has_client_token(!listener_->GetClientToken().empty());
ClientToServerMessage builder;
if (!batcher_.ToBuilder(&builder, has_client_token)) {
TLOG(logger_, WARNING, "Unable to build message");
return;
}
ClientHeader* outgoing_header = builder.mutable_header();
InitClientHeader(outgoing_header);
// Validate the message and send it.
++message_id_;
if (!msg_validator_->IsValid(builder)) {
TLOG(logger_, SEVERE, "Tried to send invalid message: %s",
ProtoHelpers::ToString(builder).c_str());
statistics_->RecordError(
Statistics::ClientErrorType_OUTGOING_MESSAGE_FAILURE);
return;
}
TLOG(logger_, FINE, "Sending message to server: %s",
ProtoHelpers::ToString(builder).c_str());
statistics_->RecordSentMessage(Statistics::SentMessageType_TOTAL);
string serialized;
builder.SerializeToString(&serialized);
network_->SendMessage(serialized);
// Record that the message was sent. We do this inline to match what the
// Java Ticl, which is constrained by Android requirements, does.
listener_->HandleMessageSent();
}
void ProtocolHandler::InitClientHeader(ClientHeader* builder) {
CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
ProtoHelpers::InitProtocolVersion(builder->mutable_protocol_version());
builder->set_client_time_ms(GetCurrentTimeMs());
builder->set_message_id(StringPrintf("%d", message_id_));
builder->set_max_known_server_time_ms(last_known_server_time_ms_);
builder->set_client_type(client_type_);
listener_->GetRegistrationSummary(builder->mutable_registration_summary());
const string& client_token = listener_->GetClientToken();
if (!client_token.empty()) {
TLOG(logger_, FINE, "Sending token on client->server message: %s",
ProtoHelpers::ToString(client_token).c_str());
builder->set_client_token(client_token);
}
}
bool Batcher::ToBuilder(ClientToServerMessage* builder, bool has_client_token) {
// Check if an initialize message needs to be sent.
if (pending_initialize_message_.get() != NULL) {
statistics_->RecordSentMessage(Statistics::SentMessageType_INITIALIZE);
builder->mutable_initialize_message()->CopyFrom(
*pending_initialize_message_);
pending_initialize_message_.reset();
}
// Note: Even if an initialize message is being sent, we can send additional
// messages such as registration messages, etc to the server. But if there is
// no token and an initialize message is not being sent, we cannot send any
// other message.
if (!has_client_token && !builder->has_initialize_message()) {
// Cannot send any message.
TLOG(logger_, WARNING,
"Cannot send message since no token and no initialize msg: %s",
ProtoHelpers::ToString(*builder).c_str());
statistics_->RecordError(Statistics::ClientErrorType_TOKEN_MISSING_FAILURE);
return false;
}
// Check for pending batched operations and add to message builder if needed.
// Add reg, acks, reg subtrees - clear them after adding.
if (!pending_acked_invalidations_.empty()) {
InitAckMessage(builder->mutable_invalidation_ack_message());
statistics_->RecordSentMessage(
Statistics::SentMessageType_INVALIDATION_ACK);
}
// Check regs.
if (!pending_registrations_.empty()) {
InitRegistrationMessage(builder->mutable_registration_message());
statistics_->RecordSentMessage(Statistics::SentMessageType_REGISTRATION);
}
// Check reg substrees.
if (!pending_reg_subtrees_.empty()) {
RegistrationSyncMessage* sync_message =
builder->mutable_registration_sync_message();
set<RegistrationSubtree, ProtoCompareLess>::const_iterator iter;
for (iter = pending_reg_subtrees_.begin();
iter != pending_reg_subtrees_.end(); ++iter) {
sync_message->add_subtree()->CopyFrom(*iter);
}
pending_reg_subtrees_.clear();
statistics_->RecordSentMessage(
Statistics::SentMessageType_REGISTRATION_SYNC);
}
// Check info message.
if (pending_info_message_.get() != NULL) {
statistics_->RecordSentMessage(Statistics::SentMessageType_INFO);
builder->mutable_info_message()->CopyFrom(*pending_info_message_);
pending_info_message_.reset();
}
return true;
}
void Batcher::InitRegistrationMessage(
RegistrationMessage* reg_message) {
CHECK(!pending_registrations_.empty());
// Run through the pending_registrations map.
map<ObjectIdP, RegistrationP::OpType, ProtoCompareLess>::iterator iter;
for (iter = pending_registrations_.begin();
iter != pending_registrations_.end(); ++iter) {
ProtoHelpers::InitRegistrationP(iter->first, iter->second,
reg_message->add_registration());
}
pending_registrations_.clear();
}
void Batcher::InitAckMessage(InvalidationMessage* ack_message) {
CHECK(!pending_acked_invalidations_.empty());
// Run through pending_acked_invalidations_ set.
set<InvalidationP, ProtoCompareLess>::iterator iter;
for (iter = pending_acked_invalidations_.begin();
iter != pending_acked_invalidations_.end(); iter++) {
ack_message->add_invalidation()->CopyFrom(*iter);
}
pending_acked_invalidations_.clear();
}
} // namespace invalidation