blob: c71d82030bb414d8ae7f3cc0cf03273629984606 [file] [log] [blame]
# Copyright (c) 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A collection of helpers to make lkgr_finder's life easier."""
# pylint: disable=line-too-long
# pylint: disable=unused-argument
import Queue
import ast
import collections
import datetime
import httplib2
import json
import logging
import os
import re
import requests
import smtplib
import socket
import subprocess
import sys
import threading
import time
import google.protobuf.message
import infra_libs
from infra_libs import luci_auth
from infra.libs import git
from infra import init_python_pb2 # pylint: disable=unused-import
from go.chromium.org.luci.buildbucket.proto import common_pb2
from go.chromium.org.luci.buildbucket.proto import rpc_pb2
class RunLogger(logging.Filter):
log = []
def filter(self, record):
RunLogger.log.append(
'%s: %s' % (datetime.datetime.now(), record.getMessage()))
return True
LOGGER = logging.getLogger(__name__)
LOGGER.addFilter(RunLogger())
##################################################
# Helper classes
##################################################
class STATUS(object):
"""Enum for holding possible build statuses."""
UNKNOWN, RUNNING, SUCCESS, FAILURE = range(4)
@staticmethod
def tostr(status): # pragma: no cover
return ['unknown', 'running', 'success', 'failure'][status]
class NOREV(object):
"""Singleton class to represent the wholesale lack of a revision."""
@staticmethod
def __str__(): # pragma: no cover
return '<No Revision>'
NOREV = NOREV()
##################################################
# VCS Wrappers
##################################################
class GitWrapper(object):
_status_path = '/git-lkgr'
_GIT_HASH_RE = re.compile('^[a-fA-F0-9]{40}$')
_GIT_POS_RE = re.compile('(\S+)@{#(\d+)}')
def __init__(self, url, path): # pragma: no cover
self._git = git.NewGit(url, path)
self._position_cache = {}
LOGGER.debug('Local git repository located at %s', self._git.path)
@property
def status_path(self): # pragma: no cover
return self._status_path
def check_rev(self, r): # pragma: no cover
if r is NOREV:
return False
return bool(self._GIT_HASH_RE.match(r))
def _cache(self, *revs): # pragma: no cover
unknown_revs = [r for r in revs if r not in self._position_cache]
positions = self._git.number(*unknown_revs)
# We know we only care about revisions along a single branch.
keys = []
for pos in positions:
match = self._GIT_POS_RE.match(pos or '')
if match:
key = (int(match.group(2)), match.group(1))
else:
key = None
keys.append(key)
self._position_cache.update(dict(zip(unknown_revs, keys)))
def keyfunc(self, r): # pragma: no cover
# Returns a tuple (commit-position-number, commit-position-ref).
if not self.check_rev(r):
return (-1, '')
k = self._position_cache.get(r)
if k is None:
self._cache(r)
k = self._position_cache.get(r)
if k is None:
return (-1, '')
return k
def sort(self, revisions, keyfunc=None): # pragma: no cover
keyfunc = keyfunc or (lambda x: x)
self._cache(*map(keyfunc, revisions))
return sorted(revisions, key=lambda x: self.keyfunc(keyfunc(x)))
def get_lag(self, r): # pragma: no cover
ts = self._git.show(r, '', '--format=format:%ct').split('\n', 1)[0].strip()
dt = datetime.datetime.utcfromtimestamp(float(ts))
return datetime.datetime.utcnow() - dt
def get_gap(self, revisions, r): # pragma: no cover
latest = self.sort(revisions)[-1]
return self.keyfunc(latest)[0] - self.keyfunc(r)[0]
##################################################
# Input Functions
##################################################
Build = collections.namedtuple(
'Build', ['number', 'result', 'revision'])
OAUTH_SCOPES = ['https://www.googleapis.com/auth/userinfo.email']
_BUILDBUCKET_SEARCH_ENDPOINT_V2 = (
'https://{buildbucket_instance}/prpc/buildbucket.v2.Builds/SearchBuilds')
_DEFAULT_BUILDBUCKET_INSTANCE = 'cr-buildbucket.appspot.com'
def _FetchFromBuildbucketImpl(
project, bucket_name, builder,
service_account_file=None): # pragma: no cover
request_pb = rpc_pb2.SearchBuildsRequest()
request_pb.predicate.builder.project = project
request_pb.predicate.builder.bucket = bucket_name
request_pb.predicate.builder.builder = builder
request_pb.predicate.status = common_pb2.ENDED_MASK
request_pb.fields.paths.extend([
'builds.*.number',
'builds.*.status',
'builds.*.input.gitiles_commit.id',
])
headers = {
'Accept': 'application/prpc; encoding=binary',
'Content-Type': 'application/prpc; encoding=binary',
}
http = httplib2.Http(timeout=300)
creds = None
if service_account_file:
creds = infra_libs.get_signed_jwt_assertion_credentials(
service_account_file, scope=OAUTH_SCOPES)
elif luci_auth.available():
creds = luci_auth.LUCICredentials(scopes=OAUTH_SCOPES)
if creds:
creds.authorize(http)
resp, content = http.request(
_BUILDBUCKET_SEARCH_ENDPOINT_V2.format(
buildbucket_instance=_DEFAULT_BUILDBUCKET_INSTANCE),
method='POST',
headers=headers,
body=request_pb.SerializeToString())
grpc_code = resp.get('X-Prpc-Grpc-Code'.lower())
if grpc_code != '0':
raise httplib2.HttpLib2Error('Invalid GRPC exit code: %s\n%s' % (
grpc_code, content))
response_pb = rpc_pb2.SearchBuildsResponse()
response_pb.ParseFromString(content)
return response_pb
_BUILDBUCKET_STATUS = {
common_pb2.CANCELED: STATUS.UNKNOWN,
common_pb2.FAILURE: STATUS.FAILURE,
common_pb2.INFRA_FAILURE: STATUS.FAILURE,
common_pb2.SUCCESS: STATUS.SUCCESS,
}
def FetchBuildbucketBuildsForBuilder(
bucket, builder, service_account_file=None):
LOGGER.debug('Fetching builds for %s/%s from buildbucket', bucket, builder)
if not '/' in bucket:
LOGGER.error(
'Unexpected bucket "%s". '
+ 'Buckets should be specified as $PROJECT/$BUCKET_NAME.',
bucket)
return None
project, bucket_name = bucket.split('/', 1)
try:
response_pb = _FetchFromBuildbucketImpl(
project, bucket_name, builder,
service_account_file=service_account_file)
except httplib2.HttpLib2Error as e:
LOGGER.error(
'RequestException while fetching %s/%s/%s:\n%s',
project, bucket_name, builder, repr(e))
return None
except google.protobuf.message.Error as e:
LOGGER.error(
'Unknown protobuf error while fetching %s/%s/%s:\n%s',
project, bucket_name, builder, repr(e))
return None
builds = []
for build_pb in response_pb.builds:
number = build_pb.number
result = _BUILDBUCKET_STATUS.get(build_pb.status)
revision = build_pb.input.gitiles_commit.id
if bool(number) and bool(revision) and result is not None:
builds.append(Build(number, result, revision))
return builds
def FetchBuildsWorker(fetch_q, fetch_fn): # pragma: no cover
"""Pull build json from builders.
Args:
@param fetch_q: A pre-populated Queue.Queue containing tuples of:
bucket: Buildbucket bucket containing the builder.
builder: Name of the builder in that bucket.
output_builds: Output dictionary of builder to build data.
@type fetch_q: tuple
"""
while True:
try:
bucket, builder, service_account, output_builds = fetch_q.get(False)
except Queue.Empty:
return
output_builds[builder] = fetch_fn(
bucket, builder, service_account_file=service_account)
def FetchBuildbucketBuilds(
buckets, max_threads=0, service_account=None): # pragma: no cover
"""Fetch all build data about the builders from the given buckets.
Args:
@param buckets: Dictionary of the form
{ bucket: {
builders: [list of strings]
} }
This dictionary is a subset of the project configuration json.
@type buckets: dict
@param max_threads: Maximum number of parallel requests.
@type max_threads: int
"""
return _FetchBuilds(
buckets, FetchBuildbucketBuildsForBuilder,
max_threads=max_threads, service_account=service_account)
def _FetchBuilds(
config, fetch_fn, max_threads=0, service_account=None): # pragma: no cover
build_data = {key: {} for key in config}
fetch_q = Queue.Queue()
for key, config_data in config.iteritems():
builders = config_data['builders']
for builder in builders:
fetch_q.put((key, builder, service_account, build_data[key]))
fetch_threads = set()
if not max_threads:
max_threads = fetch_q.qsize()
for _ in xrange(max_threads):
th = threading.Thread(target=FetchBuildsWorker,
args=(fetch_q, fetch_fn))
th.start()
fetch_threads.add(th)
for th in fetch_threads:
th.join()
failures = 0
for key, builders in build_data.iteritems():
for builder, builds in builders.iteritems():
if builds is None:
failures += 1
LOGGER.error('Failed to fetch builds for %s:%s' % (key, builder))
return build_data, failures
_BUILD_DATA_VERSION = 2
def LoadBuilds(filename):
"""Read all build data from a file or stdin."""
fh = sys.stdin if filename == '-' else open(filename, 'r')
with fh:
wrapped_builds = json.load(fh)
if wrapped_builds.get('version') != _BUILD_DATA_VERSION:
return None
builds = wrapped_builds.get('builds', {})
for key, val in builds.iteritems():
for builder, builder_data in val.iteritems():
builds[key][builder] = [Build(*b) for b in builder_data]
return builds
def DumpBuilds(builds, filename):
"""Dump all build data to a file."""
wrapped_builds = {
'builds': builds,
'version': _BUILD_DATA_VERSION,
}
with open(filename, 'w') as fh:
json.dump(wrapped_builds, fh, indent=2)
##################################################
# Data Processing
##################################################
def CollateRevisionHistory(builds, repo):
"""Sorts builds and revisions in repository order.
Args:
builds: a dict of the form:
```
builds := {
master: {
builder: [Build, ...],
...,
},
...
}
```
repo (GitWrapper): repository in which the revision occurs.
Returns:
A 2-tuple of (build_history, revisions), where:
```
build_history := {
master: {
builder: [Build, ...],
...,
},
...
}
```
and
```
revisions := [revision, ...]
```
"""
build_history = {}
revisions = set()
for category, category_data in builds.iteritems():
LOGGER.debug('Collating category %s', category)
category_history = build_history.setdefault(category, {})
for builder, builder_data in category_data.iteritems():
LOGGER.debug('Collating builder %s', builder)
for build in builder_data:
revisions.add(str(build.revision))
category_history[builder] = repo.sort(
builder_data, keyfunc=lambda b: b.revision)
revisions = repo.sort(revisions)
return (build_history, revisions)
def FindLKGRCandidate(build_history, revisions, revkey, status_gen=None):
"""Find an lkgr candidate.
This function performs the meat of the algorithm described in the module
docstring. It walks backwards through the revisions, searching for a
revision which has the SUCCESS status on every builder.
Returns:
A single revision (string) chosen as the new LKGR candidate.
Args:
build_history: A dict of build data, as from CollateRevisionHistory
revisions: A list of revisions/commits that were built
revkey: Keyfunc to map each revision to a sortable key
revcmp: A comparator to sort revisions/commits
status_gen: An instance of StatusGenerator to output status information
"""
def lowercase_key(item_pair):
return item_pair[0].lower()
lkgr = None
builders = []
for category, category_history in sorted(build_history.items(),
key=lowercase_key):
status_gen.category_cb(category)
for builder, builder_history in sorted(category_history.items(),
key=lowercase_key):
status_gen.builder_cb(builder)
gen = reversed(builder_history)
prev = []
try:
prev.append(gen.next())
except StopIteration:
prev.append(Build(-1, STATUS.UNKNOWN, NOREV))
builders.append((category, builder, gen, prev))
for revision in reversed(revisions):
status_gen.revision_cb(revision)
good_revision = True
for category, builder, gen, prev in builders:
try:
while revkey(revision) < revkey(prev[-1].revision):
prev.append(gen.next())
except StopIteration: # pragma: no cover
prev.append(Build(-1, STATUS.UNKNOWN, NOREV))
# current build matches revision
if revkey(revision) == revkey(prev[-1].revision):
status = prev[-1].result
elif len(prev) == 1:
assert revkey(revision) > revkey(prev[-1].revision)
# most recent build is behind revision
status = STATUS.UNKNOWN
elif prev[-1].result == STATUS.UNKNOWN: # pragma: no cover
status = STATUS.UNKNOWN
else:
# We color space between FAILED and INPROGRESS builds as FAILED,
# since that is what it will eventually become.
if (prev[-1].result == STATUS.SUCCESS
and prev[-2].result == STATUS.RUNNING): # pragma: no cover
status = STATUS.RUNNING
elif prev[-1].result == prev[-2].result == STATUS.SUCCESS:
status = STATUS.SUCCESS
else:
status = STATUS.FAILURE
build_num = None
if revkey(revision) == revkey(prev[-1].revision):
build_num = prev[-1].number
status_gen.build_cb(category, builder, status, build_num)
if status != STATUS.SUCCESS:
good_revision = False
if not lkgr and good_revision:
lkgr = revision
status_gen.lkgr_cb(revision)
return lkgr
def CheckLKGRLag(lag_age, rev_gap, allowed_lag_hrs, allowed_rev_gap):
"""Determine if the LKGR lag is acceptable for current commit activity.
Returns True if the lag is within acceptable thresholds.
"""
# Lag isn't an absolute threshold because when things are slow, e.g. nights
# and weekends, there could be bad revisions that don't get noticed and
# fixed right away, so LKGR could go a long time without updating, but it
# wouldn't be a big concern, so we want to back off the 'ideal' threshold.
# When the tree is active, we don't want to back off much, or at all, to keep
# the lag under control.
if rev_gap == 0:
return True
lag_hrs = (lag_age.days * 24) + (lag_age.seconds / 3600)
if not lag_hrs:
return True
rev_rate = rev_gap / lag_hrs
# This causes the allowed_lag to back off proportionally to how far LKGR is
# below the gap threshold, roughly throttled by the rate of commits since the
# last LKGR.
# Equation arbitrarily chosen to fit the range of 2 to 22 hours when using the
# default allowed_lag and allowed_gap. Might need tweaking.
max_lag_hrs = ((1 + max(0, allowed_rev_gap - rev_gap) /
min(30, max(15, rev_rate))) * allowed_lag_hrs)
LOGGER.debug('LKGR is %s hours old (threshold: %s hours)' %
(lag_hrs, max_lag_hrs))
return lag_age < datetime.timedelta(hours=max_lag_hrs)
##################################################
# Output Functions
##################################################
def SendMail(recipients, subject, message, dry): # pragma: no cover
if dry:
LOGGER.info('Dry-run: Not sending mail with subject: "%s"', subject)
return
LOGGER.info('Sending mail with subject: "%s"', subject)
try:
sender = 'lkgr_finder@%s' % socket.getfqdn()
body = ['From: %s' % sender]
body.append('To: %s' % recipients)
body.append('Subject: lkgr_finder: %s' % subject)
# Default to sending replies to the recipient list, not the account running
# the script, since that's probably just a role account.
body.append('Reply-To: %s' % recipients)
body.append('')
body.append(message)
# TODO(pgervais,crbug.com/455436): send this to sheriff-o-matic instead.
server = smtplib.SMTP('localhost')
server.sendmail(sender, recipients.split(','), '\n'.join(body))
server.quit()
except Exception as e:
# If smtp fails, just dump the output. If running under cron, that will
# capture the output and send its own (ugly, but better than nothing) email.
print message
print ('\n--------- Exception in %s -----------\n' %
os.path.basename(__file__))
raise e
def UpdateTag(new_lkgr, repo, dry): # pragma: no cover
"""Update the lkgr tag in the repository. Git only.
Args:
new_lkgr: the new commit hash for the lkgr tag to point to.
repo: instance of GitWrapper
dry: if True, don't actually update the tag.
"""
LOGGER.info('Updating lkgr tag')
push_cmd = ['push', 'origin', '%s:refs/tags/lkgr' % new_lkgr]
try:
if dry:
LOGGER.debug('Dry-run: Not pushing lkgr: %s', ' '.join(push_cmd))
else:
LOGGER.debug('Pushing lkgr: %s', ' '.join(push_cmd))
repo._git(push_cmd) # pylint: disable=W0212
except subprocess.CalledProcessError:
LOGGER.error('Failed to push new lkgr tag.')
def WriteLKGR(lkgr, filename, dry): # pragma: no cover
"""Write the lkgr to a file.
Args:
lkgr: the lkgr to write.
filename: the path to the file to write to.
dry: if True, don't actually write the file.
"""
LOGGER.info('Writing lkgr to file.')
path = os.path.abspath(filename)
if dry:
LOGGER.debug('Dry-run: Not writing lkgr to file at %s', path)
return
LOGGER.info('Writing lkgr to file at %s', path)
with open(path, 'w') as f:
f.write(str(lkgr))
def ReadLKGR(filename): # pragma: no cover
"""Read the lkgr from a file.
Args:
filename: the path to the file to read from.
"""
path = os.path.abspath(filename)
LOGGER.info('Reading lkgr from file at %s', path)
try:
with open(path, 'r') as f:
return f.read().strip()
except IOError:
return None
def WriteHTML(status_gen, filename, dry): # pragma: no cover
"""Write the html status to a file.
Args:
status_gen: populated instance of HTMLStatusGenerator
filename: the path to the file to write to.
dry: if True, don't actually write the file.
"""
LOGGER.info('Writing html status to file.')
path = os.path.abspath(filename)
if dry:
LOGGER.debug('Dry-run: Not writing html status to file at %s', path)
return
LOGGER.info('Writing html status to file at %s', path)
with open(path, 'w') as f:
f.write(status_gen.generate())
##################################################
# Processing logic
##################################################
def GetProjectConfig(project): # pragma: no cover
"""Get and combine default and project-specific configuration."""
try:
config_file = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'config', 'default_cfg.pyl')
config = ast.literal_eval(open(config_file).read())
except (IOError, ValueError):
LOGGER.fatal('Could not read default configuration file.')
raise
try:
config_file = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'config', '%s_cfg.pyl' % project)
config.update(ast.literal_eval(open(config_file).read()))
except (IOError, ValueError):
LOGGER.fatal('Could not read project configuration file. Does it exist?')
raise
return config