blob: 9d7de2ae6ddc9eee1813ab4826d278718d277671 [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import datetime
import logging
import os
import six
from google.appengine.api import runtime as apiruntime
from google.appengine.ext import ndb
from infra_libs.ts_mon import shared
def find_gaps(num_iter):
"""Generate integers not present in an iterable of integers.
Caution: this is an infinite generator.
"""
next_num = -1
for n in num_iter:
next_num += 1
while next_num < n:
yield next_num
next_num += 1
while True:
next_num += 1
yield next_num
def _assign_task_num(time_fn=datetime.datetime.utcnow):
expired_keys = []
unassigned = []
used_task_nums = []
time_now = time_fn()
expired_time = time_now - datetime.timedelta(
seconds=shared.INSTANCE_EXPIRE_SEC)
for entity in shared.Instance.query():
# Don't reassign expired task_num right away to avoid races.
if entity.task_num >= 0:
used_task_nums.append(entity.task_num)
# At the same time, don't assign task_num to expired entities.
if entity.last_updated < expired_time:
expired_keys.append(entity.key)
shared.expired_counter.increment()
logging.debug(
'Expiring %s task_num %d, inactive for %s',
entity.key.id(), entity.task_num,
time_now - entity.last_updated)
elif entity.task_num < 0:
shared.started_counter.increment()
unassigned.append(entity)
logging.debug('Found %d expired and %d unassigned instances',
len(expired_keys), len(unassigned))
used_task_nums = sorted(used_task_nums)
for entity, task_num in zip(unassigned, find_gaps(used_task_nums)):
entity.task_num = task_num
logging.debug('Assigned %s task_num %d', entity.key.id(), task_num)
futures_unassigned = ndb.put_multi_async(unassigned)
futures_expired = ndb.delete_multi_async(expired_keys)
ndb.Future.wait_all(futures_unassigned + futures_expired)
logging.debug('Committed all changes')
def report_memory(handler):
"""Wraps an app so handlers log when memory usage increased by at least 0.5MB
after the handler completed.
"""
if os.environ.get('SERVER_SOFTWARE', '').startswith('Development'):
# This is to detect "dev_appserver" environment and skip report_memory().
# report_memory() fails with the following exception in dev_appserver.
# :AssertionError: No api proxy found for service "system"
return handler # pragma: no cover
min_delta = 0.5
def dispatch_and_report(*args, **kwargs):
if six.PY2: # pragma: no cover
before = apiruntime.runtime.memory_usage().current()
else: # pragma: no cover
before = apiruntime.runtime.memory_usage().current
try:
return handler(*args, **kwargs)
finally:
if six.PY2: # pragma: no cover
after = apiruntime.runtime.memory_usage().current()
else: # pragma: no cover
after = apiruntime.runtime.memory_usage().current
if after >= before + min_delta: # pragma: no cover
logging.debug('Memory usage: %.1f -> %.1f MB; delta: %.1f MB', before,
after, after - before)
return dispatch_and_report