blob: c7ab3768b7a72cfd35a0e6076ca72c2cd9f1d150 [file] [log] [blame]
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import datetime
import logging
import os
import sys
import traceback
import uuid
from google.appengine.api import datastore_errors
from google.appengine.api import taskqueue
from google.appengine.ext import deferred
from google.appengine.ext import ndb
from google.appengine.runtime import apiproxy_errors
from dashboard.common import utils
from dashboard.pinpoint.models import change as change_module
from dashboard.pinpoint.models import errors
from dashboard.pinpoint.models import evaluators
from dashboard.pinpoint.models import event as event_module
from dashboard.pinpoint.models import job_bug_update
from dashboard.pinpoint.models import job_state
from dashboard.pinpoint.models import results2
from dashboard.pinpoint.models import scheduler
from dashboard.pinpoint.models import task as task_module
from dashboard.pinpoint.models import timing_record
from dashboard.pinpoint.models.evaluators import job_serializer
from dashboard.pinpoint.models.tasks import evaluator as task_evaluator
from dashboard.services import gerrit_service
from dashboard.services import issue_tracker_service
# We want this to be fast to minimize overhead while waiting for tasks to
# finish, but don't want to consume too many resources.
_TASK_INTERVAL = 60
_CRYING_CAT_FACE = u'\U0001f63f'
_INFINITY = u'\u221e'
_RIGHT_ARROW = u'\u2192'
_ROUND_PUSHPIN = u'\U0001f4cd'
_MAX_RECOVERABLE_RETRIES = 3
OPTION_STATE = 'STATE'
OPTION_TAGS = 'TAGS'
OPTION_ESTIMATE = 'ESTIMATE'
OPTION_INPUTS = 'INPUTS'
COMPARISON_MODES = job_state.COMPARISON_MODES
RETRY_OPTIONS = taskqueue.TaskRetryOptions(
task_retry_limit=8, min_backoff_seconds=2)
_FIRST_OR_LAST_FAILED_COMMENT = (
u"""One or both of the initial changes failed to produce any results.
Perhaps the job is misconfigured or the tests are broken? See the job
page for details.""")
def JobFromId(job_id):
"""Get a Job object from its ID.
Its ID is just its key as a hex string.
Users of Job should not have to import ndb. This function maintains an
abstraction layer that separates users from the Datastore details.
"""
job_key = ndb.Key('Job', int(job_id, 16))
return job_key.get()
def IsDone(job_id):
"""Transactionally check whether a job is done executing.
Returns True IFF the job is executing under the execution engine and that
internal state is consistent with the job being "done".
Raises any errors encountered in execution engine evaluation.
"""
job = JobFromId(job_id)
if not job.use_execution_engine:
return False
# This comes from an eventually consistent read, but we can treat that as a
# relaxed load -- if this ever is true, then that means a transaction before
# this call had already marked the job "done".
if job.done:
return True
try:
context = task_module.Evaluate(
job, event_module.SelectEvent(),
evaluators.DispatchByTaskType({
'find_culprit':
evaluators.TaskPayloadLiftingEvaluator(exclude_keys=['commits'])
}))
if not context:
return False
for payload in context.values():
status = payload.get('status')
if status in {'pending', 'ongoing'}:
return False
return True
except task_module.Error as error:
logging.error('Evaluation error: %s', error)
raise
@ndb.transactional
def MarkDone(job_id):
"""Transactionally update the job as done.
This only works for jobs executing under the execution engine.
Returns True on a successful update.
"""
job = JobFromId(job_id)
if job.done:
return False
job.done = True
job.put()
return True
@ndb.transactional
def UpdateTime(job_id):
"""Transactionally updates the updated propery of a job."""
job = JobFromId(job_id)
job.updated = datetime.datetime.utcnow()
job.put()
class BenchmarkArguments(ndb.Model):
"""Structured version of the ad-hoc 'arguments' JSON for a Job.
This class formalises the structure of the arguments passed into, and
supported by the Job model. This is intended to be used as a structured
property of Job, not a standalone entity.
"""
benchmark = ndb.StringProperty(indexed=True)
story = ndb.StringProperty(indexed=True)
story_tags = ndb.StringProperty(indexed=True)
chart = ndb.StringProperty(indexed=True)
statistic = ndb.StringProperty(indexed=True)
@classmethod
def FromArgs(cls, args):
return cls(
benchmark=args.get('benchmark'),
story=args.get('story'),
story_tags=args.get('story_tags'),
chart=args.get('chart'),
statistic=args.get('statistic'),
)
def IsRunning(job):
if not job.use_execution_engine:
return job.started and not job.cancelled and job.task and len(job.task) > 0
return job.started and not job.completed
class Job(ndb.Model):
"""A Pinpoint job."""
state = ndb.PickleProperty(required=True, compressed=True)
#####
# Job arguments passed in through the API.
#####
# Request parameters.
arguments = ndb.JsonProperty(required=True)
bug_id = ndb.IntegerProperty()
project = ndb.StringProperty(default='chromium')
comparison_mode = ndb.StringProperty()
# The Gerrit server url and change id of the code review to update upon
# completion.
gerrit_server = ndb.StringProperty()
gerrit_change_id = ndb.StringProperty()
# User-provided name of the job.
name = ndb.StringProperty()
tags = ndb.JsonProperty()
# Email of the job creator.
user = ndb.StringProperty()
created = ndb.DateTimeProperty(required=True, auto_now_add=True)
# This differs from "created" since there may be a lag between the time it
# was queued and when the scheduler actually starts the job.
started_time = ndb.DateTimeProperty(required=False)
# Don't use `auto_now` for `updated`. When we do data migration, we need
# to be able to modify the Job without changing the Job's completion time.
updated = ndb.DateTimeProperty(required=True, auto_now_add=True)
started = ndb.BooleanProperty(default=True)
completed = ndb.ComputedProperty(lambda self: self.done or (
not self.use_execution_engine and self.started and not self.task))
failed = ndb.ComputedProperty(lambda self: bool(self.exception_details_dict))
running = ndb.ComputedProperty(IsRunning)
cancelled = ndb.BooleanProperty(default=False)
cancel_reason = ndb.TextProperty()
# Because of legacy reasons, `completed` is not exactly representative of
# whether a job is "done" from the execution engine's point of view. We're
# introducing this top-level property as a definitive flag, transactionally
# enforced, to signal whether we are done executing a job.
done = ndb.BooleanProperty(default=False)
# The name of the Task Queue task this job is running on. If it's present, the
# job is running. The task is also None for Task Queue retries.
task = ndb.StringProperty()
# The contents of any Exception that was thrown to the top level.
# If it's present, the job failed.
exception = ndb.TextProperty()
exception_details = ndb.JsonProperty()
difference_count = ndb.IntegerProperty()
retry_count = ndb.IntegerProperty(default=0)
# We expose the configuration as a first-class property of the Job.
configuration = ndb.ComputedProperty(
lambda self: self.arguments.get('configuration'))
# Pull out the benchmark, chart, and statistic as a structured property at the
# top-level, so that we can analyse these in a structured manner.
benchmark_arguments = ndb.StructuredProperty(BenchmarkArguments)
# Indicate whether we should evaluate this job's tasks through the execution
# engine.
use_execution_engine = ndb.BooleanProperty(default=False, indexed=False)
# Priority for scheduling purposes. Lower numbers indicate higher priority.
priority = ndb.IntegerProperty(default=0)
# Jobs can be part of batches, which have an id provided by the creator.
batch_id = ndb.StringProperty()
@classmethod
def _post_get_hook(cls, key, future): # pylint: disable=unused-argument
e = future.get_result()
if not e:
return
if not getattr(e, 'exception_details'):
e.exception_details = e.exception_details_dict
@property
def exception_details_dict(self):
if hasattr(self, 'exception_details'):
if self.exception_details:
return self.exception_details
if hasattr(self, 'exception'):
exc = self.exception
if exc:
return {'message': exc.splitlines()[-1], 'traceback': exc}
return None
@classmethod
def New(cls,
quests,
changes,
arguments=None,
bug_id=None,
comparison_mode=None,
comparison_magnitude=None,
gerrit_server=None,
gerrit_change_id=None,
name=None,
pin=None,
tags=None,
user=None,
priority=None,
use_execution_engine=False,
project='chromium',
batch_id=None):
"""Creates a new Job, adds Changes to it, and puts it in the Datstore.
Args:
quests: An iterable of Quests for the Job to run.
changes: An iterable of the initial Changes to run on.
arguments: A dict with the original arguments used to start the Job.
bug_id: A monorail issue id number to post Job updates to.
comparison_mode: Either 'functional' or 'performance', which the Job uses
to figure out whether to perform a functional or performance bisect. If
None, the Job will not automatically add any Attempts or Changes.
comparison_magnitude: The estimated size of the regression or improvement
to look for. Smaller magnitudes require more repeats.
gerrit_server: Server of the Gerrit code review to update with job
results.
gerrit_change_id: Change id of the Gerrit code review to update with job
results.
name: The user-provided name of the Job.
pin: A Change (Commits + Patch) to apply to every Change in this Job.
tags: A dict of key-value pairs used to filter the Jobs listings.
user: The email of the Job creator.
priority: An integer indicating priority.
use_execution_engine: A bool indicating whether to use the experimental
execution engine. Currently defaulted to False, but will be switched to
True and eventually removed as an option later.
project: A Monorail project ID.
batch_id: An ID provided to link multiple jobs together.
Returns:
A Job object.
"""
state = job_state.JobState(
quests,
comparison_mode=comparison_mode,
comparison_magnitude=comparison_magnitude,
pin=pin)
args = arguments or {}
job = cls(
state=state,
arguments=args,
bug_id=bug_id,
comparison_mode=comparison_mode,
gerrit_server=gerrit_server,
gerrit_change_id=gerrit_change_id,
name=name,
tags=tags,
user=user,
started=False,
cancelled=False,
use_execution_engine=use_execution_engine,
priority=priority,
project=project,
batch_id=batch_id,
)
# Pull out the benchmark arguments to the top-level.
job.benchmark_arguments = BenchmarkArguments.FromArgs(args)
if use_execution_engine:
# Short-circuit the process because we don't need any further processing
# here when we're using the execution engine.
job.put()
return job
for c in changes:
job.AddChange(c)
job.put()
# At this point we already have an ID, so we should go through each of the
# quests associated with the state, and provide the Job ID through a common
# API.
job.state.PropagateJob(job)
job.put()
return job
def PostCreationUpdate(self):
pending = 0
if self.configuration:
try:
pending = scheduler.QueueStats(self.configuration).get('queued_jobs', 0)
except (scheduler.QueueNotFound, ndb.BadRequestError) as e:
logging.warning('Error encountered fetching queue named "%s": %s ',
self.configuration, e)
bug_update = job_bug_update.JobUpdateBuilder(self).CreationUpdate(pending)
deferred.defer(
_PostBugCommentDeferred,
self.bug_id,
bug_update.comment_text,
project=self.project,
send_email=True,
_retry_options=RETRY_OPTIONS)
@property
def job_id(self):
return '%x' % self.key.id()
@property
def status(self):
if self.failed:
return 'Failed'
if self.cancelled:
return 'Cancelled'
if self.completed:
return 'Completed'
if self.running:
return 'Running'
# By default, we assume that the Job is queued.
return 'Queued'
@property
def url(self):
host = os.environ['HTTP_HOST']
# TODO(crbug.com/939723): Remove this workaround when not needed.
if host == 'pinpoint.chromeperf.appspot.com':
host = 'pinpoint-dot-chromeperf.appspot.com'
return 'https://%s/job/%s' % (host, self.job_id)
@property
def results_url(self):
if not self.task:
url = results2.GetCachedResults2(self)
if url:
return url
# Point to the default status page if no results are available.
return '/results2/%s' % self.job_id
@property
def auto_name(self):
if self.name:
return self.name
if self.comparison_mode == job_state.FUNCTIONAL:
name = 'Functional bisect'
elif self.comparison_mode == job_state.PERFORMANCE:
name = 'Performance bisect'
else:
name = 'Try job'
if self.configuration:
name += ' on ' + self.configuration
if 'benchmark' in self.arguments:
name += '/' + self.arguments['benchmark']
return name
def AddChange(self, change):
self.state.AddChange(change)
def Start(self):
"""Starts the Job and updates it in the Datastore.
This method is designed to return fast, so that Job creation is responsive
to the user. It schedules the Job on the task queue without running
anything. It also posts a bug comment, and updates the Datastore.
"""
if self.use_execution_engine:
# Treat this as if it's a poll, and run the handler here.
try:
task_module.Evaluate(
self,
event_module.Event(type='initiate', target_task=None, payload={}),
task_evaluator.ExecutionEngine(self)),
except task_module.Error as error:
logging.error('Failed: %s', error)
self.Fail()
self.put()
return
else:
self._Schedule()
self.started = True
self.started_time = datetime.datetime.now()
self.put()
title = _ROUND_PUSHPIN + ' Pinpoint job started.'
comment = '\n'.join((title, self.url))
deferred.defer(
_PostBugCommentDeferred,
self.bug_id,
comment,
labels=job_bug_update.ComputeLabelUpdates(['Pinpoint-Job-Started']),
project=self.project,
send_email=True,
_retry_options=RETRY_OPTIONS)
def _IsTryJob(self):
return not self.comparison_mode or self.comparison_mode == job_state.TRY
def _Complete(self):
logging.debug('Job [%s]: Completed', self.job_id)
if self.use_execution_engine:
scheduler.Complete(self)
# Only proceed if this thread/process is the one that successfully marked
# a job done.
if not MarkDone(self.job_id):
return
logging.debug('Job [%s]: Marked done', self.job_id)
if not self._IsTryJob() and not self.use_execution_engine:
self.difference_count = len(self.state.Differences())
try:
# TODO(dberris): Migrate results2 generation to tasks and evaluators.
results2.ScheduleResults2Generation(self)
except taskqueue.Error as e:
logging.debug('Failed ScheduleResults2Generation: %s', str(e))
self._FormatAndPostBugCommentOnComplete()
self._UpdateGerritIfNeeded()
scheduler.Complete(self)
def _FormatAndPostBugCommentOnComplete(self):
logging.debug('Processing outputs.')
if self._IsTryJob():
# There is no comparison metric.
title = '<b>%s Job complete. See results below.</b>' % _ROUND_PUSHPIN
deferred.defer(
_PostBugCommentDeferred,
self.bug_id,
'\n'.join((title, self.url)),
project=self.project,
labels=['Pinpoint-Tryjob-Completed'],
_retry_options=RETRY_OPTIONS)
return
# There is a comparison metric.
differences = []
result_values = {}
changes_examined = None
if not self.use_execution_engine:
differences = self.state.Differences()
for change_a, change_b in differences:
result_values.setdefault(change_a, self.state.ResultValues(change_a))
result_values.setdefault(change_b, self.state.ResultValues(change_b))
changes_examined = self.state.ChangesExamined()
else:
logging.debug('Execution Engine: Finding culprits.')
context = task_module.Evaluate(
self, event_module.SelectEvent(),
evaluators.Selector(
event_type='select',
include_keys={'culprits', 'change', 'result_values'}))
differences = [(change_module.ReconstituteChange(change_a),
change_module.ReconstituteChange(change_b))
for change_a, change_b in context.get(
'performance_bisection', {}).get('culprits', [])]
result_values = {
change_module.ReconstituteChange(v.get('change')):
v.get('result_values')
for v in context.values()
if 'change' in v and 'result_values' in v
}
if not differences:
# First, check if there are no differences because one side of bisection
# failed outright.
if self.state.FirstOrLastChangeFailed():
title = "<b>%s Job finished with errors.</b>" % _CRYING_CAT_FACE
comment = '\n'.join(
(title, self.url, '', _FIRST_OR_LAST_FAILED_COMMENT))
deferred.defer(
_PostBugCommentDeferred,
self.bug_id,
comment,
project=self.project,
labels=job_bug_update.ComputeLabelUpdates(['Pinpoint-Job-Failed']),
_retry_options=RETRY_OPTIONS)
return
# When we cannot find a difference, we want to not only update the issue
# with that (minimal) information but also automatically mark the issue
# WontFix. This is based on information we've gathered in production that
# most issues where we find Pinpoint cannot reproduce the difference end
# up invariably as "Unconfirmed" with very little follow-up.
title = "<b>%s Couldn't reproduce a difference.</b>" % _ROUND_PUSHPIN
deferred.defer(
_PostBugCommentDeferred,
self.bug_id,
'\n'.join((title, self.url)),
project=self.project,
labels=job_bug_update.ComputeLabelUpdates(
['Pinpoint-Job-Completed', 'Pinpoint-No-Repro']),
status='WontFix',
_retry_options=RETRY_OPTIONS)
return
# Collect the result values for each of the differences
bug_update_builder = job_bug_update.DifferencesFoundBugUpdateBuilder(
self.state.metric)
bug_update_builder.SetExaminedCount(changes_examined)
for change_a, change_b in differences:
values_a = result_values[change_a]
values_b = result_values[change_b]
bug_update_builder.AddDifference(change_b, values_a, values_b)
deferred.defer(
job_bug_update.UpdatePostAndMergeDeferred,
bug_update_builder,
self.bug_id,
self.tags,
self.url,
self.project,
_retry_options=RETRY_OPTIONS)
def _UpdateGerritIfNeeded(self, success=True):
if self.gerrit_server and self.gerrit_change_id:
icon = _ROUND_PUSHPIN if success else _CRYING_CAT_FACE
state = 'complete' if success else 'failed'
deferred.defer(
_UpdateGerritDeferred,
self.gerrit_server,
self.gerrit_change_id,
'%s Job %s.\n\nSee results at: %s' % (icon, state, self.url),
_retry_options=RETRY_OPTIONS,
)
def Fail(self, exception=None):
# Short-circuit jobs failure updates when we are not the first one to mark a
# job done.
first_done = MarkDone(self.job_id)
if self.use_execution_engine and not first_done:
return
# Set these explicitly on this instance, since we know that at this point
# the job has already been marked done in storage.
if not self.started:
self.started = True
self.started_time = datetime.datetime.utcnow()
self.done = True
# What follows are the details we are providing when posting updates to the
# associated bug.
tb = traceback.format_exc() or ''
title = _CRYING_CAT_FACE + ' Pinpoint job stopped with an error.'
exc_info = sys.exc_info()
if exception is None:
if exc_info[1] is None:
# We've been called without a exception in sys.exc_info or in our args.
# This should not happen.
exception = errors.JobError('Unknown job error')
exception.category = 'pinpoint'
else:
exception = exc_info[1]
exc_message = exception.message
category = None
if isinstance(exception, errors.JobError):
category = exception.category
self.exception_details = {
'message': exc_message,
'traceback': tb,
'category': category,
}
self.task = None
self.put()
comment = '\n'.join((title, self.url, '', exc_message))
deferred.defer(
_PostBugCommentDeferred,
self.bug_id,
comment,
project=self.project,
labels=job_bug_update.ComputeLabelUpdates(['Pinpoint-Job-Failed']),
send_email=True,
_retry_options=RETRY_OPTIONS,
)
self._UpdateGerritIfNeeded(success=False)
scheduler.Complete(self)
def _Schedule(self, countdown=_TASK_INTERVAL):
# Set a task name to deduplicate retries. This adds some latency, but we're
# not latency-sensitive. If Job.Run() works asynchronously in the future,
# we don't need to worry about duplicate tasks.
# https://github.com/catapult-project/catapult/issues/3900
task_name = str(uuid.uuid4())
try:
task = taskqueue.add(
queue_name='job-queue',
url='/api/run/' + self.job_id,
name=task_name,
countdown=countdown)
except (apiproxy_errors.DeadlineExceededError,
taskqueue.TransientError) as exc:
raise errors.RecoverableError(exc)
self.task = task.name
def _MaybeScheduleRetry(self):
if not hasattr(self, 'retry_count') or self.retry_count is None:
self.retry_count = 0
if self.retry_count >= _MAX_RECOVERABLE_RETRIES:
return False
self.retry_count += 1
# Back off exponentially
self._Schedule(countdown=_TASK_INTERVAL * (2**self.retry_count))
return True
def Run(self):
"""Runs this Job.
Loops through all Attempts and checks the status of each one, kicking off
tasks as needed. Does not block to wait for all tasks to finish. Also
compares adjacent Changes' results and adds any additional Attempts or
Changes as needed. If there are any incomplete tasks, schedules another
Run() call on the task queue.
"""
self.exception_details = None # In case the Job succeeds on retry.
self.task = None # In case an exception is thrown.
try:
if self.use_execution_engine:
# Treat this as if it's a poll, and run the handler here.
context = task_module.Evaluate(
self,
event_module.Event(type='initiate', target_task=None, payload={}),
task_evaluator.ExecutionEngine(self))
result_status = context.get('performance_bisection', {}).get('status')
if result_status not in {'failed', 'completed'}:
return
if result_status == 'failed':
execution_errors = context['find_culprit'].get('errors', [])
if execution_errors:
self.exception_details = execution_errors[0]
self._Complete()
return
if not self._IsTryJob():
self.state.Explore()
work_left = self.state.ScheduleWork()
# Schedule moar task.
if work_left:
self._Schedule()
else:
self._Complete()
self.retry_count = 0
except errors.RecoverableError as e:
try:
if not self._MaybeScheduleRetry():
self.Fail(errors.JobRetryLimitExceededError(wrapped_exc=e))
except errors.RecoverableError as e:
self.Fail(errors.JobRetryFailed(wrapped_exc=e))
except BaseException:
self.Fail()
raise
finally:
# Don't use `auto_now` for `updated`. When we do data migration, we need
# to be able to modify the Job without changing the Job's completion time.
self.updated = datetime.datetime.now()
if self.completed:
timing_record.RecordJobTiming(self)
try:
self.put()
except (datastore_errors.Timeout,
datastore_errors.TransactionFailedError):
# Retry once.
self.put()
except datastore_errors.BadRequestError:
if self.task:
queue = taskqueue.Queue('job-queue')
queue.delete_tasks(taskqueue.Task(name=self.task))
self.task = None
# The _JobState is too large to fit in an ndb property.
# Load the Job from before we updated it, and fail it.
job = self.key.get(use_cache=False)
job.task = None
job.Fail()
job.updated = datetime.datetime.now()
job.put()
raise
def AsDict(self, options=None):
def IsoFormatOrNone(attr):
time = getattr(self, attr, None)
if time:
return time.isoformat()
return None
d = {
'job_id': self.job_id,
'configuration': self.configuration,
'results_url': self.results_url,
'arguments': self.arguments,
'bug_id': self.bug_id,
'project': self.project,
'comparison_mode': self.comparison_mode,
'name': self.auto_name,
'user': self.user,
'created': IsoFormatOrNone('created'),
'updated': IsoFormatOrNone('updated'),
'started_time': IsoFormatOrNone('started_time'),
'difference_count': self.difference_count,
'exception': self.exception_details_dict,
'status': self.status,
'cancel_reason': self.cancel_reason,
'batch_id': self.batch_id,
}
if not options:
return d
if OPTION_STATE in options or OPTION_INPUTS in options:
if self.use_execution_engine:
d.update(
task_module.Evaluate(
self,
event_module.Event(
type='serialize', target_task=None, payload={}),
job_serializer.Serializer()) or {})
else:
d.update(self.state.AsDict(options))
if OPTION_ESTIMATE in options and not self.started:
d.update(self._GetRunTimeEstimate())
if OPTION_TAGS in options:
d['tags'] = {'tags': self.tags}
return d
def _GetRunTimeEstimate(self):
result = timing_record.GetSimilarHistoricalTimings(self)
if not result:
return {}
timings = [t.total_seconds() for t in result.timings]
return {
'estimate': {
'timings': timings,
'tags': result.tags
},
'queue_stats': scheduler.QueueStats(self.configuration)
}
def Cancel(self, user, reason):
# We cannot cancel an already cancelled job.
if self.cancelled:
logging.warning(
'Attempted to cancel a cancelled job "%s"; user = %s, reason = %s',
self.job_id, user, reason)
raise errors.CancelError('Job already cancelled.')
if not scheduler.Cancel(self):
raise errors.CancelError('Scheduler failed to cancel job.')
self.cancelled = True
self.cancel_reason = '{}: {}'.format(user, reason)
self.updated = datetime.datetime.now()
# Remove any "task" identifiers.
self.task = None
self.put()
title = _ROUND_PUSHPIN + ' Pinpoint job cancelled.'
comment = u'{}\n{}\n\nCancelled by {}, reason given: {}'.format(
title, self.url, user, reason)
deferred.defer(
_PostBugCommentDeferred,
self.bug_id,
comment,
project=self.project,
send_email=True,
labels=job_bug_update.ComputeLabelUpdates(['Pinpoint-Job-Cancelled']),
_retry_options=RETRY_OPTIONS)
def _PostBugCommentDeferred(bug_id, *args, **kwargs):
if not bug_id:
return
issue_tracker = issue_tracker_service.IssueTrackerService(
utils.ServiceAccountHttp())
issue_tracker.AddBugComment(bug_id, *args, **kwargs)
def _UpdateGerritDeferred(*args, **kwargs):
gerrit_service.PostChangeComment(*args, **kwargs)