pinpoint: Fix job refresh routine
This change ensures that we're able to consistently fail jobs that have
been frozen, and update the associated bug and/or CLs.
R=abennetts@google.com, fancl@chromium.org
Bug: chromium:1197984
Change-Id: I9b8142aede5208585d5f004c08cec925f2183c7a
Reviewed-on: https://chromium-review.googlesource.com/c/catapult/+/2859366
Reviewed-by: Andrew Bennetts <abennetts@google.com>
Commit-Queue: Dean Berris <dberris@chromium.org>
diff --git a/dashboard/dashboard/pinpoint/handlers/refresh_jobs.py b/dashboard/dashboard/pinpoint/handlers/refresh_jobs.py
index a6ba8a1..4c63d90 100644
--- a/dashboard/dashboard/pinpoint/handlers/refresh_jobs.py
+++ b/dashboard/dashboard/pinpoint/handlers/refresh_jobs.py
@@ -48,16 +48,12 @@
key = _JOB_CACHE_KEY % job_id
info = layered_cache.Get(key) or {'retries': 0}
- if info.get('retries') == _JOB_MAX_RETRIES:
+ retries = info.setdefault('retries', 0)
+ if retries >= _JOB_MAX_RETRIES:
info['retries'] += 1
layered_cache.Set(key, info, days_to_keep=30)
- job.Fail(errors.REFRESH_FAILURE)
- job.put()
- logging.error('Failed retry for job %s', job_id)
- return
- elif info.get('retries') > _JOB_MAX_RETRIES:
- logging.error('Exceeded maximum retries (%s) for job %s', _JOB_MAX_RETRIES,
- job_id)
+ job.Fail(errors.JobRetryFailed())
+ logging.error('Retry #%d: failed retrying job %s', retries, job_id)
return
info['retries'] += 1
diff --git a/dashboard/dashboard/pinpoint/handlers/refresh_jobs_test.py b/dashboard/dashboard/pinpoint/handlers/refresh_jobs_test.py
index 6de45f3..8a96c45 100644
--- a/dashboard/dashboard/pinpoint/handlers/refresh_jobs_test.py
+++ b/dashboard/dashboard/pinpoint/handlers/refresh_jobs_test.py
@@ -35,37 +35,31 @@
job.started = True
job.updated = datetime.datetime.utcnow() - datetime.timedelta(hours=8)
job.put()
- job._Schedule = mock.MagicMock()
- job.Fail = mock.MagicMock()
self.assertTrue(job.running)
self.testapp.get('/cron/refresh-jobs')
self.ExecuteDeferredTasks('default')
- self.assertTrue(job._Schedule.called)
- self.assertFalse(job.Fail.called)
+ job = job_module.JobFromId(job.job_id)
def testGetWithQueuedJobs(self):
queued_job = job_module.Job.New((), ())
- queued_job._Schedule = mock.MagicMock()
- queued_job.Fail = mock.MagicMock()
+ queued_job.put()
running_job = job_module.Job.New((), ())
running_job.task = '123'
running_job.started = True
running_job.updated = datetime.datetime.utcnow() - datetime.timedelta(
hours=24)
running_job.put()
- running_job._Schedule = mock.MagicMock()
- running_job.Fail = mock.MagicMock()
self.assertFalse(queued_job.running)
self.assertTrue(running_job.running)
self.testapp.get('/cron/refresh-jobs')
+ running_job = job_module.JobFromId(running_job.job_id)
+ queued_job = job_module.JobFromId(queued_job.job_id)
self.ExecuteDeferredTasks('default')
- self.assertFalse(queued_job._Schedule.called)
- self.assertTrue(running_job._Schedule.called)
+ self.assertFalse(queued_job.running)
+ self.assertTrue(running_job.running)
def testGetWithCancelledJobs(self):
cancelled_job = job_module.Job.New((), ())
- cancelled_job._Schedule = mock.MagicMock()
- cancelled_job.Fail = mock.MagicMock()
cancelled_job.started = True
cancelled_job.updated = datetime.datetime.utcnow() - datetime.timedelta(
hours=24)
@@ -75,8 +69,10 @@
self.assertFalse(cancelled_job.running)
self.testapp.get('/cron/refresh-jobs')
self.ExecuteDeferredTasks('default')
- self.assertFalse(cancelled_job._Schedule.called)
- self.assertFalse(cancelled_job.Fail.called)
+ cancelled_job = job_module.JobFromId(cancelled_job.job_id)
+ self.assertTrue(cancelled_job.cancelled)
+ self.assertFalse(cancelled_job.running)
+ self.assertEqual(cancelled_job.status, 'Cancelled')
def testGet_RetryLimit(self):
j1 = job_module.Job.New((), ())
@@ -120,8 +116,6 @@
j2.started = True
j2.updated = datetime.datetime.utcnow() - datetime.timedelta(hours=8)
j2.put()
- j2._Schedule = mock.MagicMock()
- j2.Fail = mock.MagicMock()
layered_cache.Set(refresh_jobs._JOB_CACHE_KEY % j2.job_id,
{'retries': refresh_jobs._JOB_MAX_RETRIES + 1})
@@ -129,9 +123,6 @@
self.testapp.get('/cron/refresh-jobs')
self.ExecuteDeferredTasks('default')
-
self.assertFalse(j1._Schedule.called)
- self.assertFalse(j1.Fail.called)
-
- self.assertFalse(j2._Schedule.called)
- self.assertFalse(j2.Fail.called)
+ j2 = job_module.JobFromId(j2.job_id)
+ self.assertEqual(j2.status, 'Failed')
diff --git a/dashboard/dashboard/pinpoint/models/job.py b/dashboard/dashboard/pinpoint/models/job.py
index 3c9aaf7..430e54f 100644
--- a/dashboard/dashboard/pinpoint/models/job.py
+++ b/dashboard/dashboard/pinpoint/models/job.py
@@ -586,16 +586,35 @@
self.project,
_retry_options=RETRY_OPTIONS)
- def _UpdateGerritIfNeeded(self):
+ def _UpdateGerritIfNeeded(self, success=True):
if self.gerrit_server and self.gerrit_change_id:
+ icon = _ROUND_PUSHPIN if success else _CRYING_CAT_FACE
+ state = 'complete' if success else 'failed'
deferred.defer(
_UpdateGerritDeferred,
self.gerrit_server,
self.gerrit_change_id,
- '%s Job complete.\n\nSee results at: %s' % (_ROUND_PUSHPIN, self.url),
- _retry_options=RETRY_OPTIONS)
+ '%s Job %s.\n\nSee results at: %s' % (icon, state, self.url),
+ _retry_options=RETRY_OPTIONS,
+ )
def Fail(self, exception=None):
+ # Short-circuit jobs failure updates when we are not the first one to mark a
+ # job done.
+ first_done = MarkDone(self.job_id)
+ if self.use_execution_engine and not first_done:
+ return
+
+ # Set these explicitly on this instance, since we know that at this point
+ # the job has already been marked done in storage.
+ if not self.started:
+ self.started = True
+ self.started_time = datetime.datetime.utcnow()
+
+ self.done = True
+
+ # What follows are the details we are providing when posting updates to the
+ # associated bug.
tb = traceback.format_exc() or ''
title = _CRYING_CAT_FACE + ' Pinpoint job stopped with an error.'
exc_info = sys.exc_info()
@@ -618,14 +637,10 @@
'category': category,
}
self.task = None
+ self.put()
comment = '\n'.join((title, self.url, '', exc_message))
- # Short-circuit jobs failure updates when we are not the first one to mark a
- # job done.
- if self.use_execution_engine and not MarkDone(self.job_id):
- return
-
deferred.defer(
_PostBugCommentDeferred,
self.bug_id,
@@ -633,7 +648,9 @@
project=self.project,
labels=job_bug_update.ComputeLabelUpdates(['Pinpoint-Job-Failed']),
send_email=True,
- _retry_options=RETRY_OPTIONS)
+ _retry_options=RETRY_OPTIONS,
+ )
+ self._UpdateGerritIfNeeded(success=False)
scheduler.Complete(self)
def _Schedule(self, countdown=_TASK_INTERVAL):