blob: 7a37c2ce5457d0f9a129924ee38b304b095d6b36 [file] [log] [blame]
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Testable functions for Log."""
# Note: Don't mind the littering of pragma no cover statements in the file,
# most of this file is actually covered, they're here because of phantom
# branch misses.
import datetime
import httplib2
import logging
import oauth2client.client
import os
import sys
import time
from googleapiclient import discovery
from oauth2client.client import OAuth2WebServerFlow
# https://chromium.googlesource.com/infra/infra/+/master/infra_libs/logs/README.md
LOGGER = logging.getLogger(__name__)
# All Chrome related logs go here.
PROJECT_ID = 'chrome-infra-logs'
# We put everything under this service name.
SERVICE_NAME = 'compute.googleapis.com'
# All of the relevent tables are in this dataset.
DATASET_ID = 'logs'
CACHE_FILE = os.path.join(
os.path.expanduser('~'), '.config', 'chrome_infra', 'auth',
'cloud_logging.json')
CAT_QUERY = """
SELECT timestamp, textPayload, labels.cloudtail_resource_id
FROM TABLE_DATE_RANGE(
%s.%%s_,
DATE_ADD(CURRENT_TIMESTAMP(), %%d, 'DAY'),
DATE_ADD(CURRENT_TIMESTAMP(), %%d, 'DAY')),
""" % DATASET_ID
LIST_QUERY = """
SELECT labels.cloudtail_resource_id
FROM TABLE_DATE_RANGE(
%s.%%s_,
DATE_ADD(CURRENT_TIMESTAMP(), %%d, 'DAY'),
DATE_ADD(CURRENT_TIMESTAMP(), %%d, 'DAY')),
GROUP BY labels.cloudtail_resource_id
""" % DATASET_ID
class LogQuery(object):
def __init__(self, project, service, limit, days_from, days_until):
self._api = None
self.project = project
self.service = service
self.limit = limit
self.days_from = days_from
self.days_until = days_until
self._bq = None
self._http = None
self._creds = None
@staticmethod
def _auth(): # pragma: no cover
flow = OAuth2WebServerFlow(
# This belongs to the chrome-infra-logs project.
client_id='410962439126-g1t7r572gam70qcehd49a1n5gq1lj814'
'.apps.googleusercontent.com',
# This is not actually a secret, it's more like an API key.
client_secret='VqOcmdXVRIgMekbFfu2eDzEL',
scope=['https://www.googleapis.com/auth/bigquery'],
# For simplicity, the local webserver flow is disabled. It's a flaky
# flow anyways because if you have multiple Chrome profiles, which
# profile gets chosen is anyone's guess.
redirect_uri=oauth2client.client.OOB_CALLBACK_URN)
authorize_url = flow.step1_get_authorize_url()
print 'Go to URL: %s' % authorize_url
code = raw_input('Enter verification code: ').strip()
creds = flow.step2_exchange(code)
if not os.path.exists(os.path.dirname(CACHE_FILE)):
os.makedirs(os.path.dirname(CACHE_FILE))
with open(CACHE_FILE, 'wb') as f:
f.write(creds.to_json())
return creds
def _get_auth(self): # pragma: no cover
if not os.path.exists(CACHE_FILE):
return self._auth()
with open(CACHE_FILE, 'r') as f:
return oauth2client.client.OAuth2Credentials.from_json(f.read())
def _actually_init(self): # pragma: no cover
self._creds = self._get_auth()
self._http = self._creds.authorize(httplib2.Http())
self._bq = discovery.build('bigquery', 'v2', http=self._http)
def cat(self, targets, limit=None):
assert len(targets) in (1, 2)
log_name = targets[0]
resource_id = None
if len(targets) == 2:
resource_id = targets[1]
else:
print >>sys.stderr, (
'WARNING: Querying for all resources under a service, '
'this may take a long time...')
if not limit: # pragma: no branch
limit = self.limit
tzoffset = datetime.timedelta(
seconds=time.altzone if time.daylight else time.timezone)
offset = time.altzone / 60 / 60
print >>sys.stderr, (
'NOTE: All times are in local system time (%g hour(s)).' % -offset)
q = CAT_QUERY % (log_name, self.days_from, self.days_until)
if resource_id:
q += '\n WHERE labels.cloudtail_resource_id = "%s"' % resource_id
# ORDER BY always comes after WHERE
q += ' ORDER BY timestamp DESC'
# LIMIT has to be the last thing in the query.
q += ' LIMIT %d' % limit
LOGGER.debug(q)
j = self._bq.jobs().query(
projectId=PROJECT_ID,
body={
'query': q,
'maxResults': 1000,
'allowLargeResults': True}).execute()
# TODO(hinoka): Retry with a lower limit if this 403's.
# TODO(hinoka): Maybe timeout?
if not j['jobComplete']:
# If the job takes more than 10s to complete, then we have to continually
# poll for the job, since BigQuery doesn't give us partial results.
job_id = j['jobReference']['jobId']
while True:
print >>sys.stderr, 'Still working... (Job ID: %s)' % job_id
# inner job
ij = self._bq.jobs().getQueryResults(
projectId=PROJECT_ID, jobId=job_id,
timeoutMs=1000, maxResults=1000).execute()
if not ij['jobComplete']: # pragma: no cover
continue
j = ij
break
lines = []
while True:
for row in j.get('rows', []):
ts_f = row['f'][0]['v'] # Bigquery returns timestamps as float strings.
line = row['f'][1]['v'] # textPayload
resource = row['f'][2]['v'] # metadata.labels.value
utc_ts = datetime.datetime.utcfromtimestamp(float(ts_f))
local_ts = utc_ts - tzoffset
local_ts_str = local_ts.strftime('%Y-%m-%d %I:%M:%S%p')
if resource_id:
lines.append('%s: %s' % (local_ts_str, line))
else:
lines.append('%s (%s): %s' % (local_ts_str, resource, line))
if not j.get('pageToken'): # pragma: no branch
break
# Should finish immediately.
j = self._bq.jobs().getQueryResults( # pragma: no cover
projectId=PROJECT_ID, jobId=j['jobReference']['jobId'],
maxResults=1000, pageToken=j['pageToken']).execute()
for line in reversed(lines):
print line
def _list_logs_names(self):
"""List all of the log names in a project."""
tables = []
nextPage = None
while True:
page = self._bq.tables().list(
projectId=PROJECT_ID, datasetId=DATASET_ID,
pageToken=nextPage, maxResults=1000).execute()
tables.extend(page['tables'])
if not page.get('nextPageToken'): # pragma: no branch
break
nextPage = page['nextPageToken'] # pragma: no cover
names = set()
for table in tables:
# 9 represents the _YYYYMMDD postfix of the tablename.
names.add(table['tableReference']['tableId'][:-9])
for name in sorted(names):
print name
def _list_resources(self, log_name):
"""List all resources associated with a log name."""
q = LIST_QUERY % (log_name, self.days_from, self.days_until)
LOGGER.debug(q)
j = self._bq.jobs().query(
projectId=PROJECT_ID, body={'query': q}).execute()
for row in sorted(j['rows'], key=lambda x: x['f'][0]['v']):
print row['f'][0]['v']
def list_logs(self, log_name):
if log_name:
return self._list_resources(log_name)
return self._list_logs_names()