blob: e750fff3c4ea9afabf8d988d52d7a9c7743878f8 [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import copy
import datetime
import functools
import logging
import os
import sys
import time
import threading
# Not all apps enable endpoints. If the import fails, the app will not
# use @instrument_endpoint() decorator, so it is safe to ignore it.
try:
import endpoints
except ImportError: # pragma: no cover
pass
import webapp2
from google.appengine.api import modules
from google.appengine.api.app_identity import app_identity
from google.appengine.api import runtime
from google.appengine.ext import ndb
from infra_libs.ts_mon import handlers
from infra_libs.ts_mon import shared
from infra_libs.ts_mon.common import http_metrics
from infra_libs.ts_mon.common import interface
from infra_libs.ts_mon.common import metric_store
from infra_libs.ts_mon.common import monitors
from infra_libs.ts_mon.common import standard_metrics
from infra_libs.ts_mon.common import targets
def _reset_cumulative_metrics():
"""Clear the state when an instance loses its task_num assignment."""
logging.warning('Instance %s got purged from Datastore, but is still alive. '
'Clearing cumulative metrics.', shared.instance_key_id())
for _, metric, _, _, _ in interface.state.store.get_all():
if metric.is_cumulative():
metric.reset()
_flush_metrics_lock = threading.Lock()
def need_to_flush_metrics(time_now):
"""Check if metrics need flushing, and update the timestamp of last flush.
Even though the caller of this function may not successfully flush the
metrics, we still update the last_flushed timestamp to prevent too much work
being done in user requests.
Also, this check-and-update has to happen atomically, to ensure only one
thread can flush metrics at a time.
"""
if not interface.state.flush_enabled_fn():
return False
datetime_now = datetime.datetime.utcfromtimestamp(time_now)
minute_ago = datetime_now - datetime.timedelta(seconds=60)
with _flush_metrics_lock:
if interface.state.last_flushed > minute_ago:
return False
interface.state.last_flushed = datetime_now
return True
def flush_metrics_if_needed(time_now):
if not need_to_flush_metrics(time_now):
return False
return _flush_metrics(time_now)
def _flush_metrics(time_now):
"""Return True if metrics were actually sent."""
if interface.state.target is None:
# ts_mon is not configured.
return False
datetime_now = datetime.datetime.utcfromtimestamp(time_now)
entity = shared.get_instance_entity()
if entity.task_num < 0:
if interface.state.target.task_num >= 0:
_reset_cumulative_metrics()
interface.state.target.task_num = -1
interface.state.last_flushed = entity.last_updated
updated_sec_ago = (datetime_now - entity.last_updated).total_seconds()
if updated_sec_ago > shared.INSTANCE_EXPECTED_TO_HAVE_TASK_NUM_SEC:
logging.warning('Instance %s is %d seconds old with no task_num.',
shared.instance_key_id(), updated_sec_ago)
return False
interface.state.target.task_num = entity.task_num
entity.last_updated = datetime_now
entity_deferred = entity.put_async()
interface.flush()
for metric in interface.state.global_metrics.itervalues():
metric.reset()
entity_deferred.get_result()
return True
def _shutdown_hook(time_fn=time.time):
shared.shutdown_counter.increment()
if flush_metrics_if_needed(time_fn()):
logging.info('Shutdown hook: deleting %s, metrics were flushed.',
shared.instance_key_id())
else:
logging.warning('Shutdown hook: deleting %s, metrics were NOT flushed.',
shared.instance_key_id())
with shared.instance_namespace_context():
ndb.Key(shared.Instance._get_kind(), shared.instance_key_id()).delete()
def _internal_callback():
for module_name in modules.get_modules():
target_fields = {
'task_num': 0,
'hostname': '',
'job_name': module_name,
}
shared.appengine_default_version.set(
modules.get_default_version(module_name), target_fields=target_fields)
def initialize(app=None, is_enabled_fn=None, cron_module='default',
is_local_unittest=None):
"""Instruments webapp2 `app` with gae_ts_mon metrics.
Instruments all the endpoints in `app` with basic metrics.
Args:
app (webapp2 app): the app to instrument.
is_enabled_fn (function or None): a function returning bool if ts_mon should
send the actual metrics. None (default) is equivalent to lambda: True.
This allows apps to turn monitoring on or off dynamically, per app.
cron_module (str): the name of the module handling the
/internal/cron/ts_mon/send endpoint. This allows moving the cron job
to any module the user wants.
is_local_unittest (bool or None): whether we are running in a unittest.
"""
if is_local_unittest is None: # pragma: no cover
# Since gae_ts_mon.initialize is called at module-scope by appengine apps,
# AppengineTestCase.setUp() won't have run yet and none of the appengine
# stubs will be initialized, so accessing Datastore or even getting the
# application ID will fail.
is_local_unittest = ('expect_tests' in sys.argv[0])
if is_enabled_fn is not None:
interface.state.flush_enabled_fn = is_enabled_fn
if app is not None:
instrument_wsgi_application(app)
if is_local_unittest or modules.get_current_module_name() == cron_module:
instrument_wsgi_application(handlers.app)
# Use the application ID as the service name and the module name as the job
# name.
if is_local_unittest: # pragma: no cover
service_name = 'unittest'
job_name = 'unittest'
hostname = 'unittest'
else:
service_name = app_identity.get_application_id()
job_name = modules.get_current_module_name()
hostname = modules.get_current_version_name()
runtime.set_shutdown_hook(_shutdown_hook)
interface.state.target = targets.TaskTarget(
service_name, job_name, shared.REGION, hostname, task_num=-1)
interface.state.flush_mode = 'manual'
interface.state.last_flushed = datetime.datetime.utcnow()
# Don't send metrics when running on the dev appserver.
if (is_local_unittest or
os.environ.get('SERVER_SOFTWARE', '').startswith('Development')):
logging.info('Using debug monitor')
interface.state.global_monitor = monitors.DebugMonitor()
else:
logging.info('Using https monitor %s with %s', shared.PRODXMON_ENDPOINT,
shared.PRODXMON_SERVICE_ACCOUNT_EMAIL)
interface.state.global_monitor = monitors.HttpsMonitor(
shared.PRODXMON_ENDPOINT,
monitors.DelegateServiceAccountCredentials(
shared.PRODXMON_SERVICE_ACCOUNT_EMAIL,
monitors.AppengineCredentials()))
interface.state.use_new_proto = True
interface.register_global_metrics([shared.appengine_default_version])
interface.register_global_metrics_callback(
shared.INTERNAL_CALLBACK_NAME, _internal_callback)
# We invoke global callbacks once for the whole application in the cron
# handler. Leaving this set to True would invoke them once per task.
interface.state.invoke_global_callbacks_on_flush = False
standard_metrics.init()
logging.info('Initialized ts_mon with service_name=%s, job_name=%s, '
'hostname=%s', service_name, job_name, hostname)
def _instrumented_dispatcher(dispatcher, request, response, time_fn=time.time):
start_time = time_fn()
response_status = 0
flush_thread = None
time_now = time_fn()
if need_to_flush_metrics(time_now):
flush_thread = threading.Thread(target=_flush_metrics, args=(time_now,))
flush_thread.start()
try:
ret = dispatcher(request, response)
except webapp2.HTTPException as ex:
response_status = ex.code
raise
except Exception:
response_status = 500
raise
else:
if isinstance(ret, webapp2.Response):
response = ret
response_status = response.status_int
finally:
if flush_thread:
flush_thread.join()
elapsed_ms = int((time_fn() - start_time) * 1000)
# Use the route template regex, not the request path, to prevent an
# explosion in possible field values.
name = request.route.template if request.route is not None else ''
http_metrics.update_http_server_metrics(
name, response_status, elapsed_ms,
request_size=request.content_length,
response_size=response.content_length,
user_agent=request.user_agent)
return ret
def instrument_wsgi_application(app, time_fn=time.time):
# Don't instrument the same router twice.
if hasattr(app.router, '__instrumented_by_ts_mon'):
return
old_dispatcher = app.router.dispatch
def dispatch(router, request, response):
return _instrumented_dispatcher(old_dispatcher, request, response,
time_fn=time_fn)
app.router.set_dispatcher(dispatch)
app.router.__instrumented_by_ts_mon = True
def instrument_endpoint(time_fn=time.time):
"""Decorator to instrument Cloud Endpoint methods."""
def decorator(fn):
method_name = fn.__name__
assert method_name
@functools.wraps(fn)
def decorated(service, *args, **kwargs):
service_name = service.__class__.__name__
endpoint_name = '/_ah/spi/%s.%s' % (service_name, method_name)
start_time = time_fn()
response_status = 0
flush_thread = None
time_now = time_fn()
if need_to_flush_metrics(time_now):
flush_thread = threading.Thread(target=_flush_metrics, args=(time_now,))
flush_thread.start()
try:
ret = fn(service, *args, **kwargs)
response_status = 200
return ret
except endpoints.ServiceException as e:
response_status = e.http_status
raise
except Exception:
response_status = 500
raise
finally:
if flush_thread:
flush_thread.join()
elapsed_ms = int((time_fn() - start_time) * 1000)
http_metrics.update_http_server_metrics(
endpoint_name, response_status, elapsed_ms)
return decorated
return decorator
class DjangoMiddleware(object):
STATE_ATTR = 'ts_mon_state'
def __init__(self, time_fn=time.time):
self._time_fn = time_fn
def _callable_name(self, fn):
if hasattr(fn, 'im_class') and hasattr(fn, 'im_func'): # Bound method.
return '.'.join([
fn.im_class.__module__,
fn.im_class.__name__,
fn.im_func.func_name])
if hasattr(fn, '__name__'): # Function.
return fn.__module__ + '.' + fn.__name__
return '<unknown>' # pragma: no cover
def process_view(self, request, view_func, view_args, view_kwargs):
time_now = self._time_fn()
state = {
'flush_thread': None,
'name': self._callable_name(view_func),
'start_time': time_now,
}
if need_to_flush_metrics(time_now):
thread = threading.Thread(target=_flush_metrics, args=(time_now,))
thread.start()
state['flush_thread'] = thread
setattr(request, self.STATE_ATTR, state)
return None
def process_response(self, request, response):
try:
state = getattr(request, self.STATE_ATTR)
except AttributeError:
return response
if state['flush_thread'] is not None:
state['flush_thread'].join()
duration_secs = self._time_fn() - state['start_time']
request_size = 0
if hasattr(request, 'body'):
request_size = len(request.body)
response_size = 0
if hasattr(response, 'content'):
response_size = len(response.content)
http_metrics.update_http_server_metrics(
state['name'],
response.status_code,
duration_secs * 1000,
request_size=request_size,
response_size=response_size,
user_agent=request.META.get('HTTP_USER_AGENT', None))
return response
def reset_for_unittest(disable=False):
interface.reset_for_unittest(disable=disable)