blob: cce4f02dabe0e88d1f204caf697488b152d3afbd [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import datetime
import json
import logging
import os
import time
import webapp2
from google.appengine.api import runtime as apiruntime
from google.appengine.ext import ndb
from infra_libs.ts_mon import shared
from infra_libs.ts_mon.common import distribution
from infra_libs.ts_mon.common import interface
from infra_libs.ts_mon.common import metrics
def find_gaps(num_iter):
"""Generate integers not present in an iterable of integers.
Caution: this is an infinite generator.
"""
next_num = -1
for n in num_iter:
next_num += 1
while next_num < n:
yield next_num
next_num += 1
while True:
next_num += 1
yield next_num
def _assign_task_num(time_fn=datetime.datetime.utcnow):
expired_keys = []
unassigned = []
used_task_nums = []
time_now = time_fn()
expired_time = time_now - datetime.timedelta(
seconds=shared.INSTANCE_EXPIRE_SEC)
for entity in shared.Instance.query():
# Don't reassign expired task_num right away to avoid races.
if entity.task_num >= 0:
used_task_nums.append(entity.task_num)
# At the same time, don't assign task_num to expired entities.
if entity.last_updated < expired_time:
expired_keys.append(entity.key)
shared.expired_counter.increment()
logging.debug(
'Expiring %s task_num %d, inactive for %s',
entity.key.id(), entity.task_num,
time_now - entity.last_updated)
elif entity.task_num < 0:
shared.started_counter.increment()
unassigned.append(entity)
logging.debug('Found %d expired and %d unassigned instances',
len(expired_keys), len(unassigned))
used_task_nums = sorted(used_task_nums)
for entity, task_num in zip(unassigned, find_gaps(used_task_nums)):
entity.task_num = task_num
logging.debug('Assigned %s task_num %d', entity.key.id(), task_num)
futures_unassigned = ndb.put_multi_async(unassigned)
futures_expired = ndb.delete_multi_async(expired_keys)
ndb.Future.wait_all(futures_unassigned + futures_expired)
logging.debug('Committed all changes')
class SendHandler(webapp2.RequestHandler):
def get(self):
if self.request.headers.get('X-Appengine-Cron') != 'true':
self.abort(403)
with shared.instance_namespace_context():
_assign_task_num()
interface.invoke_global_callbacks()
class TSMonJSHandler(webapp2.RequestHandler):
"""Proxy handler for ts_mon metrics collected in JavaScript.
To use this class:
1. Subclass it and override self.xsrf_is_valid
2. After instantiation call self.register_metrics to register global metrics.
"""
def __init__(self, request=None, response=None):
super(TSMonJSHandler, self).__init__(request, response)
self._metrics = None
def register_metrics(self, metrics_list):
"""Registers ts_mon metrics, required for use.
Args:
metrics_list: a list of definitions, from ts_mon.metrics.
"""
interface.register_global_metrics(metrics_list)
self._metrics = self._metrics or {}
for metric in metrics_list:
if metric.is_cumulative():
metric.dangerously_enable_cumulative_set()
self._metrics[metric.name] = metric
def post(self):
"""POST expects a JSON body that's a dict which includes a key "metrics".
This key's value is an array of objects with schema:
{
"metrics": [{
"MetricInfo": {
"Name": "monorail/frontend/float_test",
"ValueType": 2
},
"Cells": [{
"value": 1,
"fields": {},
"start_time": 1538430628174
}]
}]
}
Important!
The user of this library is responsible for validating XSRF tokens via
implementing the method self.xsrf_is_valid.
"""
if not self._metrics:
self.response.set_status(400)
self.response.write('No metrics have been registered.')
logging.warning('gae_ts_mon error: No metrics have been registered.')
return
try:
body = json.loads(self.request.body)
except ValueError:
self.response.set_status(400)
self.response.write('Invalid JSON.')
logging.warning('gae_ts_mon error: Invalid JSON.')
return
if not self.xsrf_is_valid(body):
self.response.set_status(403)
self.response.write('XSRF token invalid.')
logging.warning('gae_ts_mon error: XSRF token invalid.')
return
if not isinstance(body, dict):
self.response.set_status(400)
self.response.write('Body must be a dictionary.')
logging.warning('gae_ts_mon error: Body must be a dictionary.')
return
if 'metrics' not in body:
self.response.set_status(400)
self.response.write('Key "metrics" must be in body.')
logging.warning('gae_ts_mon error: Key "metrics" must be in body.')
logging.warning('Request body: %s', body)
return
for metric_measurement in body.get('metrics', []):
name = metric_measurement['MetricInfo']['Name']
metric = self._metrics.get(name, None)
if not metric:
self.response.set_status(400)
self.response.write('Metric "%s" is not defined.' % name)
logging.warning(
'gae_ts_mon error: Metric "%s" is not defined.', name)
return
for cell in metric_measurement.get('Cells', []):
fields = cell.get('fields', {})
value = cell.get('value')
metric_field_keys = set(fs.name for fs in metric.field_spec)
if set(fields.keys()) != metric_field_keys:
self.response.set_status(400)
self.response.write('Supplied fields do not match metric "%s".' % name)
logging.warning(
'gae_ts_mon error: Supplied fields do not match metric "%s".',
name)
logging.warning('Supplied fields keys: %s', fields.keys())
logging.warning('Metric fields keys: %s', metric_field_keys)
return
start_time = cell.get('start_time')
if metric.is_cumulative() and not start_time:
self.response.set_status(400)
self.response.write('Cumulative metrics must have start_time.')
logging.warning(
'gae_ts_mon error: Cumulative metrics must have start_time.')
logging.warning('Metric name: %s', name)
return
if metric.is_cumulative() and not self._start_time_is_valid(start_time):
self.response.set_status(400)
self.response.write('Invalid start_time: %s.' % start_time)
logging.warning(
'gae_ts_mon error: Invalid start_time: %s.', start_time)
return
# Convert distribution metric values into Distribution classes.
if (isinstance(metric, (metrics.CumulativeDistributionMetric,
metrics.NonCumulativeDistributionMetric))):
if not isinstance(value, dict):
self.response.set_status(400)
self.response.write('Distribution metric values must be a dict.')
logging.warning(
'gae_ts_mon error: Distribution metric values must be a dict.')
logging.warning('Metric value: %s', value)
return
dist_value = distribution.Distribution(bucketer=metric.bucketer)
dist_value.sum = value.get('sum', 0)
dist_value.count = value.get('count', 0)
dist_value.buckets.update(value.get('buckets', {}))
metric.set(dist_value, fields=fields)
else:
metric.set(value, fields=fields)
if metric.is_cumulative():
metric.dangerously_set_start_time(start_time)
self.response.set_status(201)
self.response.write('Ok.')
def xsrf_is_valid(self, _body):
"""Takes a request body and returns whether the included XSRF token
is valid.
This method must be implemented by a subclass.
"""
raise NotImplementedError('xsrf_is_valid must be implemented in a subclass.')
def time_fn(self):
"""Defaults to time.time. Can be overridden for testing."""
return time.time()
def _start_time_is_valid(self, start_time):
"""Validates that a start_time is not in the future and not
more than a month in the past.
"""
now = self.time_fn()
if start_time > now:
return False
one_month_seconds = 60 * 60 * 24 * 30
one_month_ago = now - one_month_seconds
if start_time < one_month_ago:
return False
return True
def report_memory(application):
"""Wraps an app so handlers log when memory usage increased by at least 0.5MB
after the handler completed.
"""
if os.environ.get('SERVER_SOFTWARE', '').startswith('Development'):
# Otherwise this fails with:
# AssertionError: No api proxy found for service "system"
return # pragma: no cover
min_delta = 0.5
old_dispatcher = application.router.dispatch
def dispatch_and_report(*args, **kwargs):
before = apiruntime.runtime.memory_usage().current()
try:
return old_dispatcher(*args, **kwargs)
finally:
after = apiruntime.runtime.memory_usage().current()
if after >= before + min_delta: # pragma: no cover
logging.debug(
'Memory usage: %.1f -> %.1f MB; delta: %.1f MB',
before, after, after - before)
application.router.dispatch = dispatch_and_report
def create_app():
ts_mon_app = webapp2.WSGIApplication([
(r'/internal/cron/ts_mon/send', SendHandler),
])
report_memory(ts_mon_app)
return ts_mon_app
app = create_app()