pinpoint: Fix job refresh routine

This change ensures that we're able to consistently fail jobs that have
been frozen, and update the associated bug and/or CLs.

R=abennetts@google.com, fancl@chromium.org

Bug: chromium:1197984
Change-Id: I9b8142aede5208585d5f004c08cec925f2183c7a
Reviewed-on: https://chromium-review.googlesource.com/c/catapult/+/2859366
Reviewed-by: Andrew Bennetts <abennetts@google.com>
Commit-Queue: Dean Berris <dberris@chromium.org>
diff --git a/dashboard/dashboard/pinpoint/handlers/refresh_jobs.py b/dashboard/dashboard/pinpoint/handlers/refresh_jobs.py
index a6ba8a1..4c63d90 100644
--- a/dashboard/dashboard/pinpoint/handlers/refresh_jobs.py
+++ b/dashboard/dashboard/pinpoint/handlers/refresh_jobs.py
@@ -48,16 +48,12 @@
   key = _JOB_CACHE_KEY % job_id
   info = layered_cache.Get(key) or {'retries': 0}
 
-  if info.get('retries') == _JOB_MAX_RETRIES:
+  retries = info.setdefault('retries', 0)
+  if retries >= _JOB_MAX_RETRIES:
     info['retries'] += 1
     layered_cache.Set(key, info, days_to_keep=30)
-    job.Fail(errors.REFRESH_FAILURE)
-    job.put()
-    logging.error('Failed retry for job %s', job_id)
-    return
-  elif info.get('retries') > _JOB_MAX_RETRIES:
-    logging.error('Exceeded maximum retries (%s) for job %s', _JOB_MAX_RETRIES,
-                  job_id)
+    job.Fail(errors.JobRetryFailed())
+    logging.error('Retry #%d: failed retrying job %s', retries, job_id)
     return
 
   info['retries'] += 1
diff --git a/dashboard/dashboard/pinpoint/handlers/refresh_jobs_test.py b/dashboard/dashboard/pinpoint/handlers/refresh_jobs_test.py
index 6de45f3..8a96c45 100644
--- a/dashboard/dashboard/pinpoint/handlers/refresh_jobs_test.py
+++ b/dashboard/dashboard/pinpoint/handlers/refresh_jobs_test.py
@@ -35,37 +35,31 @@
     job.started = True
     job.updated = datetime.datetime.utcnow() - datetime.timedelta(hours=8)
     job.put()
-    job._Schedule = mock.MagicMock()
-    job.Fail = mock.MagicMock()
     self.assertTrue(job.running)
     self.testapp.get('/cron/refresh-jobs')
     self.ExecuteDeferredTasks('default')
-    self.assertTrue(job._Schedule.called)
-    self.assertFalse(job.Fail.called)
+    job = job_module.JobFromId(job.job_id)
 
   def testGetWithQueuedJobs(self):
     queued_job = job_module.Job.New((), ())
-    queued_job._Schedule = mock.MagicMock()
-    queued_job.Fail = mock.MagicMock()
+    queued_job.put()
     running_job = job_module.Job.New((), ())
     running_job.task = '123'
     running_job.started = True
     running_job.updated = datetime.datetime.utcnow() - datetime.timedelta(
         hours=24)
     running_job.put()
-    running_job._Schedule = mock.MagicMock()
-    running_job.Fail = mock.MagicMock()
     self.assertFalse(queued_job.running)
     self.assertTrue(running_job.running)
     self.testapp.get('/cron/refresh-jobs')
+    running_job = job_module.JobFromId(running_job.job_id)
+    queued_job = job_module.JobFromId(queued_job.job_id)
     self.ExecuteDeferredTasks('default')
-    self.assertFalse(queued_job._Schedule.called)
-    self.assertTrue(running_job._Schedule.called)
+    self.assertFalse(queued_job.running)
+    self.assertTrue(running_job.running)
 
   def testGetWithCancelledJobs(self):
     cancelled_job = job_module.Job.New((), ())
-    cancelled_job._Schedule = mock.MagicMock()
-    cancelled_job.Fail = mock.MagicMock()
     cancelled_job.started = True
     cancelled_job.updated = datetime.datetime.utcnow() - datetime.timedelta(
         hours=24)
@@ -75,8 +69,10 @@
     self.assertFalse(cancelled_job.running)
     self.testapp.get('/cron/refresh-jobs')
     self.ExecuteDeferredTasks('default')
-    self.assertFalse(cancelled_job._Schedule.called)
-    self.assertFalse(cancelled_job.Fail.called)
+    cancelled_job = job_module.JobFromId(cancelled_job.job_id)
+    self.assertTrue(cancelled_job.cancelled)
+    self.assertFalse(cancelled_job.running)
+    self.assertEqual(cancelled_job.status, 'Cancelled')
 
   def testGet_RetryLimit(self):
     j1 = job_module.Job.New((), ())
@@ -120,8 +116,6 @@
     j2.started = True
     j2.updated = datetime.datetime.utcnow() - datetime.timedelta(hours=8)
     j2.put()
-    j2._Schedule = mock.MagicMock()
-    j2.Fail = mock.MagicMock()
 
     layered_cache.Set(refresh_jobs._JOB_CACHE_KEY % j2.job_id,
                       {'retries': refresh_jobs._JOB_MAX_RETRIES + 1})
@@ -129,9 +123,6 @@
     self.testapp.get('/cron/refresh-jobs')
 
     self.ExecuteDeferredTasks('default')
-
     self.assertFalse(j1._Schedule.called)
-    self.assertFalse(j1.Fail.called)
-
-    self.assertFalse(j2._Schedule.called)
-    self.assertFalse(j2.Fail.called)
+    j2 = job_module.JobFromId(j2.job_id)
+    self.assertEqual(j2.status, 'Failed')
diff --git a/dashboard/dashboard/pinpoint/models/job.py b/dashboard/dashboard/pinpoint/models/job.py
index 3c9aaf7..430e54f 100644
--- a/dashboard/dashboard/pinpoint/models/job.py
+++ b/dashboard/dashboard/pinpoint/models/job.py
@@ -586,16 +586,35 @@
         self.project,
         _retry_options=RETRY_OPTIONS)
 
-  def _UpdateGerritIfNeeded(self):
+  def _UpdateGerritIfNeeded(self, success=True):
     if self.gerrit_server and self.gerrit_change_id:
+      icon = _ROUND_PUSHPIN if success else _CRYING_CAT_FACE
+      state = 'complete' if success else 'failed'
       deferred.defer(
           _UpdateGerritDeferred,
           self.gerrit_server,
           self.gerrit_change_id,
-          '%s Job complete.\n\nSee results at: %s' % (_ROUND_PUSHPIN, self.url),
-          _retry_options=RETRY_OPTIONS)
+          '%s Job %s.\n\nSee results at: %s' % (icon, state, self.url),
+          _retry_options=RETRY_OPTIONS,
+      )
 
   def Fail(self, exception=None):
+    # Short-circuit jobs failure updates when we are not the first one to mark a
+    # job done.
+    first_done = MarkDone(self.job_id)
+    if self.use_execution_engine and not first_done:
+      return
+
+    # Set these explicitly on this instance, since we know that at this point
+    # the job has already been marked done in storage.
+    if not self.started:
+      self.started = True
+      self.started_time = datetime.datetime.utcnow()
+
+    self.done = True
+
+    # What follows are the details we are providing when posting updates to the
+    # associated bug.
     tb = traceback.format_exc() or ''
     title = _CRYING_CAT_FACE + ' Pinpoint job stopped with an error.'
     exc_info = sys.exc_info()
@@ -618,14 +637,10 @@
         'category': category,
     }
     self.task = None
+    self.put()
 
     comment = '\n'.join((title, self.url, '', exc_message))
 
-    # Short-circuit jobs failure updates when we are not the first one to mark a
-    # job done.
-    if self.use_execution_engine and not MarkDone(self.job_id):
-      return
-
     deferred.defer(
         _PostBugCommentDeferred,
         self.bug_id,
@@ -633,7 +648,9 @@
         project=self.project,
         labels=job_bug_update.ComputeLabelUpdates(['Pinpoint-Job-Failed']),
         send_email=True,
-        _retry_options=RETRY_OPTIONS)
+        _retry_options=RETRY_OPTIONS,
+    )
+    self._UpdateGerritIfNeeded(success=False)
     scheduler.Complete(self)
 
   def _Schedule(self, countdown=_TASK_INTERVAL):