blob: fa84fc870cda307dd08472592740303c7d61e1ea [file] [log] [blame]
// Copyright 2012 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Throttles calls to a function.
#include "google/cacheinvalidation/impl/throttle.h"
#include <algorithm>
#include "google/cacheinvalidation/include/system-resources.h"
#include "google/cacheinvalidation/deps/callback.h"
namespace invalidation {
using INVALIDATION_STL_NAMESPACE::max;
Throttle::Throttle(
const RepeatedPtrField<RateLimitP>& rate_limits, Scheduler* scheduler,
Closure* listener)
: rate_limits_(rate_limits), scheduler_(scheduler), listener_(listener),
timer_scheduled_(false) {
// Find the largest 'count' in all of the rate limits, as this is the size of
// the buffer of recent messages we need to retain.
max_recent_events_ = 1;
for (size_t i = 0; i < static_cast<size_t>(rate_limits_.size()); ++i) {
const RateLimitP& rate_limit = rate_limits.Get(i);
CHECK(rate_limit.window_ms() > rate_limit.count()) <<
"Windows size too small";
max_recent_events_ = max(static_cast<int>(max_recent_events_),
rate_limits_.Get(i).count());
}
}
void Throttle::Fire() {
if (timer_scheduled_) {
// We're already rate-limited and have a deferred call scheduled. Just
// return. The flag will be reset when the deferred task runs.
return;
}
// Go through all of the limits to see if we've hit one. If so, schedule a
// task to try again once that limit won't be violated. If no limits would be
// violated, send.
Time now = scheduler_->GetCurrentTime();
for (size_t i = 0; i < static_cast<size_t>(rate_limits_.size()); ++i) {
RateLimitP rate_limit = rate_limits_.Get(i);
// We're now checking whether sending would violate a rate limit of 'count'
// messages per 'window_size'.
int count = rate_limit.count();
TimeDelta window_size = TimeDelta::FromMilliseconds(rate_limit.window_ms());
// First, see how many messages we've sent so far (up to the size of our
// recent message buffer).
int num_recent_messages = recent_event_times_.size();
// Check whether we've sent enough messages yet that we even need to
// consider this rate limit.
if (num_recent_messages >= count) {
// If we've sent enough messages to reach this limit, see how long ago we
// sent the first message in the interval, and add sufficient delay to
// avoid violating the rate limit.
// We have sent at least 'count' messages. See how long ago we sent the
// 'count'-th last message. This defines the start of a window in which
// no more than 'count' messages may be sent.
Time window_start = recent_event_times_[num_recent_messages - count];
// The end of this window is 'window_size' after the start.
Time window_end = window_start + window_size;
// Check where the end of the window is relative to the current time. If
// the end of the window is in the future, then sending now would violate
// the rate limit, so we must defer.
TimeDelta window_end_from_now = window_end - now;
if (window_end_from_now > TimeDelta::FromSeconds(0)) {
// Rate limit would be violated, so schedule a task to try again.
// Set the flag to indicate we have a deferred task scheduled. No need
// to continue checking other rate limits now.
timer_scheduled_ = true;
scheduler_->Schedule(
window_end_from_now,
NewPermanentCallback(this, &Throttle::RetryFire));
return;
}
}
}
// We checked all the rate limits, and none would have been violated, so it's
// safe to call the listener.
listener_->Run();
// Record the fact that we're triggering an event now.
recent_event_times_.push_back(scheduler_->GetCurrentTime());
// Only save up to max_recent_events_ event times.
if (recent_event_times_.size() > max_recent_events_) {
recent_event_times_.pop_front();
}
}
} // namespace invalidation